From 5668cdeb7c5fbc0457bb72afa275e5e369f12c8c Mon Sep 17 00:00:00 2001 From: SionoiS Date: Tue, 17 Dec 2024 08:36:14 -0500 Subject: [PATCH 1/5] protocols & tests --- tests/waku_store_sync/test_protocol.nim | 232 ++++++++++++++ waku/waku_store_sync/protocols_metrics.nim | 12 + waku/waku_store_sync/reconciliation.nim | 356 +++++++++++++++++++++ waku/waku_store_sync/transfer.nim | 219 +++++++++++++ 4 files changed, 819 insertions(+) create mode 100644 tests/waku_store_sync/test_protocol.nim create mode 100644 waku/waku_store_sync/protocols_metrics.nim create mode 100644 waku/waku_store_sync/reconciliation.nim create mode 100644 waku/waku_store_sync/transfer.nim diff --git a/tests/waku_store_sync/test_protocol.nim b/tests/waku_store_sync/test_protocol.nim new file mode 100644 index 0000000000..8b3d820d13 --- /dev/null +++ b/tests/waku_store_sync/test_protocol.nim @@ -0,0 +1,232 @@ +{.used.} + +import + std/[options, sets, random, math], + testutils/unittests, + chronos, + libp2p/crypto/crypto, + stew/byteutils + +import + ../../waku/[ + node/peer_manager, + waku_core, + waku_core/message, + waku_core/message/digest, + waku_store_sync, + waku_store_sync/storage/range_processing, + ], + ../testlib/[wakucore, testasync], + ./sync_utils + +suite "Waku Sync: 2 nodes recon": + var serverSwitch {.threadvar.}: Switch + var clientSwitch {.threadvar.}: Switch + + var + idsChannel {.threadvar.}: AsyncQueue[ID] + wantsChannel {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + needsChannel {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + + var server {.threadvar.}: SyncReconciliation + var client {.threadvar.}: SyncReconciliation + + var serverPeerInfo {.threadvar.}: RemotePeerInfo + var clientPeerInfo {.threadvar.}: RemotePeerInfo + + asyncSetup: + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + idsChannel = newAsyncQueue[ID]() + wantsChannel = newAsyncQueue[(PeerId, Fingerprint)]() + needsChannel = newAsyncQueue[(PeerId, Fingerprint)]() + + server = + await newTestWakuRecon(serverSwitch, idsChannel, wantsChannel, needsChannel) + client = + await newTestWakuRecon(clientSwitch, idsChannel, wantsChannel, needsChannel) + + serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo() + + asyncTeardown: + await sleepAsync(10.milliseconds) + + await allFutures(server.stop(), client.stop()) + await allFutures(serverSwitch.stop(), clientSwitch.stop()) + + asyncTest "sync 2 nodes both empty": + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), res.error + + check: + idsChannel.len == 0 + wantsChannel.len == 0 + needsChannel.len == 0 + + asyncTest "sync 2 nodes empty client full server": + let + msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) + msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) + msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic) + hash1 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) + hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + hash3 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) + + server.messageIngress(hash1, msg1) + server.messageIngress(hash2, msg2) + server.messageIngress(hash3, msg3) + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), res.error + + check: + needsChannel.contains((clientPeerInfo.peerId, hash1)) == true + needsChannel.contains((clientPeerInfo.peerId, hash2)) == true + needsChannel.contains((clientPeerInfo.peerId, hash3)) == true + + asyncTest "sync 2 nodes full client empty server": + let + msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) + msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) + msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic) + hash1 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) + hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + hash3 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg3) + + client.messageIngress(hash1, msg1) + client.messageIngress(hash2, msg2) + client.messageIngress(hash3, msg3) + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), res.error + + check: + needsChannel.contains((serverPeerInfo.peerId, hash1)) == true + needsChannel.contains((serverPeerInfo.peerId, hash2)) == true + needsChannel.contains((serverPeerInfo.peerId, hash3)) == true + + asyncTest "sync 2 nodes different hashes": + let + msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) + msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) + msg3 = fakeWakuMessage(ts = now() + 2, contentTopic = DefaultContentTopic) + hash1 = computeMessageHash(DefaultPubsubTopic, msg1) + hash2 = computeMessageHash(DefaultPubsubTopic, msg2) + hash3 = computeMessageHash(DefaultPubsubTopic, msg3) + + server.messageIngress(hash1, msg1) + server.messageIngress(hash2, msg2) + client.messageIngress(hash1, msg1) + client.messageIngress(hash3, msg3) + + var syncRes = await client.storeSynchronization(some(serverPeerInfo)) + assert syncRes.isOk(), $syncRes.error + + check: + needsChannel.contains((serverPeerInfo.peerId, hash3)) + needsChannel.contains((clientPeerInfo.peerId, hash2)) + wantsChannel.contains((clientPeerInfo.peerId, hash3)) + wantsChannel.contains((serverPeerInfo.peerId, hash2)) + + asyncTest "sync 2 nodes same hashes": + let + msg1 = fakeWakuMessage(ts = now(), contentTopic = DefaultContentTopic) + msg2 = fakeWakuMessage(ts = now() + 1, contentTopic = DefaultContentTopic) + hash1 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg1) + hash2 = computeMessageHash(pubsubTopic = DefaultPubsubTopic, msg2) + + server.messageIngress(hash1, msg1) + client.messageIngress(hash1, msg1) + server.messageIngress(hash2, msg2) + client.messageIngress(hash2, msg2) + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check: + wantsChannel.len == 0 + needsChannel.len == 0 + + asyncTest "sync 2 nodes 100K msgs 1 diff": + let msgCount = 100_000 + var diffIndex = rand(msgCount) + var diff: WakuMessageHash + + # the sync window is 1 hour, spread msg equally in that time + let timeSlice = calculateTimeRange() + let timeWindow = int64(timeSlice.b) - int64(timeSlice.a) + let (part, _) = divmod(timeWindow, 100_000) + + var timestamp = timeSlice.a + + for i in 0 ..< msgCount: + let msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic) + let hash = computeMessageHash(DefaultPubsubTopic, msg) + + server.messageIngress(hash, msg) + + if i != diffIndex: + client.messageIngress(hash, msg) + else: + diff = hash + + timestamp += Timestamp(part) + continue + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + check: + wantsChannel.contains((serverPeerInfo.peerId, Fingerprint(diff))) == true + needsChannel.contains((clientPeerInfo.peerId, Fingerprint(diff))) == true + + asyncTest "sync 2 nodes 10K msgs 1K diffs": + let msgCount = 10_000 + var diffCount = 1_000 + + var diffMsgHashes: HashSet[WakuMessageHash] + var randIndexes: HashSet[int] + + # Diffs + for i in 0 ..< diffCount: + var randInt = rand(0 ..< msgCount) + + #make sure we actually have the right number of diffs + while randInt in randIndexes: + randInt = rand(0 ..< msgCount) + + randIndexes.incl(randInt) + + # sync window is 1 hour, spread msg equally in that time + let timeSlice = calculateTimeRange() + let timeWindow = int64(timeSlice.b) - int64(timeSlice.a) + let (part, _) = divmod(timeWindow, 100_000) + + var timestamp = timeSlice.a + + for i in 0 ..< msgCount: + let + msg = fakeWakuMessage(ts = timestamp, contentTopic = DefaultContentTopic) + hash = computeMessageHash(DefaultPubsubTopic, msg) + + server.messageIngress(hash, msg) + + if i in randIndexes: + diffMsgHashes.incl(hash) + else: + client.messageIngress(hash, msg) + + timestamp += Timestamp(part) + continue + + let res = await client.storeSynchronization(some(serverPeerInfo)) + assert res.isOk(), $res.error + + # timimg issue make it hard to match exact numbers + check: + wantsChannel.len > 900 + needsChannel.len > 900 diff --git a/waku/waku_store_sync/protocols_metrics.nim b/waku/waku_store_sync/protocols_metrics.nim new file mode 100644 index 0000000000..940e5835cd --- /dev/null +++ b/waku/waku_store_sync/protocols_metrics.nim @@ -0,0 +1,12 @@ +import metrics + +declarePublicHistogram reconciliation_roundtrips, + "the nubmer of roundtrips for each reconciliation", + buckets = [0.0, 1.0, 2.0, 3.0, 5.0, 10.0] + +declarePublicSummary total_bytes_exchanged, + "the number of bytes sent and received by the protocols", + ["transfer_sent", "transfer_recv", "reconciliation_sent", "reconciliation_recv"] + +declarePublicCounter total_messages_exchanged, + "the number of messages sent and received by the transfer protocol", ["sent", "recv"] diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim new file mode 100644 index 0000000000..d86c3cd619 --- /dev/null +++ b/waku/waku_store_sync/reconciliation.nim @@ -0,0 +1,356 @@ +{.push raises: [].} + +import + std/sequtils, + stew/byteutils, + results, + chronicles, + chronos, + metrics, + libp2p/utility, + libp2p/protocols/protocol, + libp2p/stream/connection, + libp2p/crypto/crypto, + eth/p2p/discoveryv5/enr +import + ../common/nimchronos, + ../common/protobuf, + ../common/paging, + ../waku_enr, + ../waku_core, + ../node/peer_manager/peer_manager, + ../waku_archive, + ./common, + ./codec, + ./storage/storage, + ./storage/seq_storage, + ./storage/range_processing, + ./protocols_metrics + +logScope: + topics = "waku reconciliation" + +type SyncReconciliation* = ref object of LPProtocol + peerManager: PeerManager + + wakuArchive: WakuArchive + + storage: SyncStorage + + # Receive IDs from transfer protocol for storage + idsRx: AsyncQueue[ID] + + # Send Hashes to transfer protocol for reception + wantsTx: AsyncQueue[(PeerId, Fingerprint)] + + # Send Hashes to transfer protocol for transmission + needsTx: AsyncQueue[(PeerId, Fingerprint)] + + # params + syncInterval: timer.Duration # Time between each syncronisation attempt + syncRange: timer.Duration # Amount of time in the past to sync + relayJitter: Duration # Amount of time since the present to ignore when syncing + + # futures + periodicSyncFut: Future[void] + periodicPruneFut: Future[void] + idsReceiverFut: Future[void] + +proc messageIngress*( + self: SyncReconciliation, pubsubTopic: PubsubTopic, msg: WakuMessage +) = + let msgHash = computeMessageHash(pubsubTopic, msg) + + let id = ID(time: msg.timestamp, fingerprint: msgHash) + + self.storage.insert(id).isOkOr: + error "failed to insert new message", hash = msgHash.toHex(), err = error + +proc messageIngress*( + self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage +) = + let id = ID(time: msg.timestamp, fingerprint: msgHash) + + self.storage.insert(id).isOkOr: + error "failed to insert new message", hash = msgHash.toHex(), err = error + +proc messageIngress*(self: SyncReconciliation, id: ID) = + self.storage.insert(id).isOkOr: + error "failed to insert new message", hash = id.fingerprint.toHex(), err = error + +proc processRequest( + self: SyncReconciliation, conn: Connection +): Future[Result[void, string]] {.async.} = + var roundTrips = 0 + + while true: + let readRes = catch: + await conn.readLp(-1) + + let buffer: seq[byte] = readRes.valueOr: + return err("connection read error: " & error.msg) + + total_bytes_exchanged.observe(buffer.len, labelValues = ["reconciliation_recv"]) + + let recvPayload = SyncPayload.deltaDecode(buffer) + + roundTrips.inc() + + trace "sync payload received", + local = self.peerManager.switch.peerInfo.peerId, + remote = conn.peerId, + payload = recvPayload + + if recvPayload.ranges.len == 0 or + recvPayload.ranges.allIt(it[1] == RangeType.skipRange): + break + + var + hashToRecv: seq[Fingerprint] + hashToSend: seq[Fingerprint] + + let sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv) + + for hash in hashToSend: + await self.needsTx.addLast((conn.peerId, hash)) + + for hash in hashToRecv: + await self.wantstx.addLast((conn.peerId, hash)) + + let rawPayload = sendPayload.deltaEncode() + + total_bytes_exchanged.observe(rawPayload.len, labelValues = ["reconciliation_sent"]) + + let writeRes = catch: + await conn.writeLP(rawPayload) + + if writeRes.isErr(): + return err("connection write error: " & writeRes.error.msg) + + trace "sync payload sent", + local = self.peerManager.switch.peerInfo.peerId, + remote = conn.peerId, + payload = sendPayload + + if sendPayload.ranges.len == 0 or + sendPayload.ranges.allIt(it[1] == RangeType.skipRange): + break + + continue + + reconciliation_roundtrips.observe(roundTrips) + + await conn.close() + + return ok() + +proc initiate( + self: SyncReconciliation, connection: Connection +): Future[Result[void, string]] {.async.} = + let + timeRange = calculateTimeRange(self.relayJitter, self.syncRange) + lower = ID(time: timeRange.a, fingerprint: EmptyFingerprint) + upper = ID(time: timeRange.b, fingerprint: FullFingerprint) + bounds = lower .. upper + + fingerprint = self.storage.fingerprinting(bounds) + initPayload = SyncPayload( + ranges: @[(bounds, fingerprintRange)], fingerprints: @[fingerprint], itemSets: @[] + ) + + let sendPayload = initPayload.deltaEncode() + + total_bytes_exchanged.observe(sendPayload.len, labelValues = ["reconciliation_sent"]) + + let writeRes = catch: + await connection.writeLP(sendPayload) + + if writeRes.isErr(): + return err("connection write error: " & writeRes.error.msg) + + trace "sync payload sent", + local = self.peerManager.switch.peerInfo.peerId, + remote = connection.peerId, + payload = sendPayload + + ?await self.processRequest(connection) + + return ok() + +proc storeSynchronization*( + self: SyncReconciliation, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo) +): Future[Result[void, string]] {.async.} = + let peer = peerInfo.valueOr: + self.peerManager.selectPeer(SyncReconciliationCodec).valueOr: + return err("no suitable peer found for sync") + + let connOpt = await self.peerManager.dialPeer(peer, SyncReconciliationCodec) + + let conn: Connection = connOpt.valueOr: + return err("cannot establish sync connection") + + debug "sync session initialized", + local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId + + (await self.initiate(conn)).isOkOr: + error "sync session failed", + local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId, err = error + + return err("sync request error: " & error) + + debug "sync session ended gracefully", + local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId + + return ok() + +proc initFillStorage( + syncRange: timer.Duration, wakuArchive: WakuArchive +): Future[Result[seq[ID], string]] {.async.} = + if wakuArchive.isNil(): + return err("waku archive unavailable") + + let endTime = getNowInNanosecondTime() + let starTime = endTime - syncRange.nanos + + #TODO special query for only timestap and hash ??? + + var query = ArchiveQuery( + includeData: true, + cursor: none(ArchiveCursor), + startTime: some(starTime), + endTime: some(endTime), + pageSize: 100, + direction: PagingDirection.FORWARD, + ) + + debug "initial storage filling started" + + var ids = newSeq[ID](50_000) + + # we assume IDs are in order + + while true: + let response = (await wakuArchive.findMessages(query)).valueOr: + return err("archive retrival failed: " & $error) + + for i in 0 ..< response.hashes.len: + let hash = response.hashes[i] + let msg = response.messages[i] + + ids.add(ID(time: msg.timestamp, fingerprint: hash)) + + if response.cursor.isNone(): + break + + query.cursor = response.cursor + + debug "initial storage filling done", elements = ids.len + + return ok(ids) + +proc new*( + T: type SyncReconciliation, + peerManager: PeerManager, + wakuArchive: WakuArchive, + syncRange: timer.Duration = DefaultSyncRange, + syncInterval: timer.Duration = DefaultSyncInterval, + relayJitter: timer.Duration = DefaultGossipSubJitter, + idsRx: AsyncQueue[ID], + wantsTx: AsyncQueue[(PeerId, Fingerprint)], + needsTx: AsyncQueue[(PeerId, Fingerprint)], +): Future[Result[T, string]] {.async.} = + let res = await initFillStorage(syncRange, wakuArchive) + let storage = + if res.isErr(): + warn "will not sync messages before this point in time", error = res.error + SeqStorage.new(50_000) + else: + SeqStorage.new(res.get()) + + var sync = SyncReconciliation( + peerManager: peerManager, + storage: storage, + syncRange: syncRange, + syncInterval: syncInterval, + relayJitter: relayJitter, + idsRx: idsRx, + wantsTx: wantsTx, + needsTx: needsTx, + ) + + let handler = proc(conn: Connection, proto: string) {.async, closure.} = + (await sync.processRequest(conn)).isOkOr: + error "request processing error", error = error + + return + + sync.handler = handler + sync.codec = SyncReconciliationCodec + + info "Store Reconciliation protocol initialized" + + return ok(sync) + +proc periodicSync(self: SyncReconciliation) {.async.} = + debug "periodic sync initialized", interval = $self.syncInterval + + while true: # infinite loop + await sleepAsync(self.syncInterval) + + debug "periodic sync started" + + (await self.storeSynchronization()).isOkOr: + error "periodic sync failed", err = error + continue + + debug "periodic sync done" + +proc periodicPrune(self: SyncReconciliation) {.async.} = + debug "periodic prune initialized", interval = $self.syncInterval + + # to stagger the sync/prune intervals + await sleepAsync((self.syncInterval div 2)) + + while true: # infinite loop + await sleepAsync(self.syncInterval) + + debug "periodic prune started" + + let time = getNowInNanosecondTime() - self.syncRange.nanos + + let count = self.storage.prune(time) + + debug "periodic prune done", elementsTrimmed = count + +proc idsReceiverLoop(self: SyncReconciliation) {.async.} = + while true: # infinite loop + let id = await self.idsRx.popfirst() + + self.messageIngress(id) + +proc start*(self: SyncReconciliation) = + if self.started: + return + + self.started = true + + if self.syncInterval > ZeroDuration: + self.periodicSyncFut = self.periodicSync() + + if self.syncInterval > ZeroDuration: + self.periodicPruneFut = self.periodicPrune() + + self.idsReceiverFut = self.idsReceiverLoop() + + info "Store Sync Reconciliation protocol started" + +proc stopWait*(self: SyncReconciliation) {.async.} = + if self.syncInterval > ZeroDuration: + await self.periodicSyncFut.cancelAndWait() + + if self.syncInterval > ZeroDuration: + await self.periodicPruneFut.cancelAndWait() + + await self.idsReceiverFut.cancelAndWait() + + info "Store Sync Reconciliation protocol stopped" diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim new file mode 100644 index 0000000000..9ae5a774ed --- /dev/null +++ b/waku/waku_store_sync/transfer.nim @@ -0,0 +1,219 @@ +{.push raises: [].} + +import + std/sets, + results, + chronicles, + chronos, + metrics, + libp2p/utility, + libp2p/protocols/protocol, + libp2p/stream/connection, + libp2p/crypto/crypto, + eth/p2p/discoveryv5/enr +import + ../common/nimchronos, + ../common/protobuf, + ../waku_enr, + ../waku_core, + ../node/peer_manager/peer_manager, + ../waku_archive, + ../waku_archive/common, + ./common, + ./codec, + ./protocols_metrics + +logScope: + topics = "waku transfer" + +type SyncTransfer* = ref object of LPProtocol + wakuArchive: WakuArchive + peerManager: PeerManager + + # Send IDs to reconciliation protocol for storage + idsTx: AsyncQueue[ID] + + # Receive Hashes from reconciliation protocol for reception + wantsRx: AsyncQueue[(PeerId, Fingerprint)] + wantsRxFut: Future[void] + inSessions: Table[PeerId, HashSet[WakuMessageHash]] + + # Receive Hashes from reconciliation protocol for transmission + needsRx: AsyncQueue[(PeerId, Fingerprint)] + needsRxFut: Future[void] + outSessions: Table[PeerId, Connection] + +proc sendMessage( + conn: Connection, payload: WakuMessagePayload +): Future[Result[void, string]] {.async.} = + let rawPayload = payload.encode().buffer + + total_bytes_exchanged.observe(rawPayload.len, labelValues = ["transfer_sent"]) + + let writeRes = catch: + await conn.writeLP(rawPayload) + + if writeRes.isErr(): + return err("connection write error: " & writeRes.error.msg) + + total_messages_exchanged.inc(labelValues = ["sent"]) + + return ok() + +proc openConnection( + self: SyncTransfer, peerId: PeerId +): Future[Result[Connection, string]] {.async.} = + let connOpt = await self.peerManager.dialPeer(peerId, SyncTransferCodec) + + let conn: Connection = connOpt.valueOr: + return err("Cannot establish transfer connection") + + debug "transfer session initialized", + local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId + + return ok(conn) + +proc wantsReceiverLoop(self: SyncTransfer) {.async.} = + ## Waits for message hashes, + ## store the peers and hashes locally as + ## "supposed to be received" + + while true: # infinite loop + let (peerId, fingerprint) = await self.wantsRx.popFirst() + + self.inSessions.withValue(peerId, value): + value[].incl(fingerprint) + do: + var hashes = initHashSet[WakuMessageHash]() + hashes.incl(fingerprint) + self.inSessions[peerId] = hashes + + return + +proc needsReceiverLoop(self: SyncTransfer) {.async.} = + ## Waits for message hashes, + ## open connection to the other peers, + ## get the messages from DB and then send them. + + while true: # infinite loop + let (peerId, fingerprint) = await self.needsRx.popFirst() + + if not self.outSessions.hasKey(peerId): + let connection = (await self.openConnection(peerId)).valueOr: + error "failed to establish transfer connection", error = error + continue + + self.outSessions[peerid] = connection + + let connection = self.outSessions[peerId] + + var query = ArchiveQuery() + query.includeData = true + query.hashes = @[fingerprint] + + let response = (await self.wakuArchive.findMessages(query)).valueOr: + error "failed to query archive", error = error + continue + + let msg = + WakuMessagePayload(pubsub: response.topics[0], message: response.messages[0]) + + (await sendMessage(connection, msg)).isOkOr: + error "failed to send message", error = error + continue + + return + +proc initProtocolHandler(self: SyncTransfer) = + let handler = proc(conn: Connection, proto: string) {.async, closure.} = + while true: + let readRes = catch: + await conn.readLp(int64(DefaultMaxWakuMessageSize)) + + let buffer: seq[byte] = readRes.valueOr: + # connection closed normally + break + + total_bytes_exchanged.observe(buffer.len, labelValues = ["transfer_recv"]) + + let payload = WakuMessagePayload.decode(buffer).valueOr: + error "decoding error", error = $error + continue + + total_messages_exchanged.inc(labelValues = ["recv"]) + + let msg = payload.message + let pubsub = payload.pubsub + + let hash = computeMessageHash(pubsub, msg) + + self.inSessions.withValue(conn.peerId, value): + if value[].missingOrExcl(hash): + error "unwanted hash received, disconnecting" + self.inSessions.del(conn.peerId) + await conn.close() + break + do: + error "unwanted hash received, disconnecting" + self.inSessions.del(conn.peerId) + await conn.close() + break + + #TODO verify msg RLN proof... + + (await self.wakuArchive.syncMessageIngress(hash, pubsub, msg)).isOkOr: + continue + + let id = Id(time: msg.timestamp, fingerprint: hash) + await self.idsTx.addLast(id) + + continue + + debug "transfer session ended", + local = self.peerManager.switch.peerInfo.peerId, remote = conn.peerId + + return + + self.handler = handler + self.codec = SyncTransferCodec + +proc new*( + T: type SyncTransfer, + peerManager: PeerManager, + wakuArchive: WakuArchive, + idsTx: AsyncQueue[ID], + wantsRx: AsyncQueue[(PeerId, Fingerprint)], + needsRx: AsyncQueue[(PeerId, Fingerprint)], +): T = + var transfer = SyncTransfer( + peerManager: peerManager, + wakuArchive: wakuArchive, + idsTx: idsTx, + wantsRx: wantsRx, + needsRx: needsRx, + ) + + transfer.initProtocolHandler() + + info "Store Transfer protocol initialized" + + return transfer + +proc start*(self: SyncTransfer) = + if self.started: + return + + self.started = true + + self.wantsRxFut = self.wantsReceiverLoop() + self.needsRxFut = self.needsReceiverLoop() + + info "Store Sync Transfer protocol started" + +proc stopWait*(self: SyncTransfer) {.async.} = + self.started = false + + await self.wantsRxFut.cancelAndWait() + await self.needsRxFut.cancelAndWait() + + info "Store Sync Transfer protocol stopped" From 27a48fb765e4d56da780b97a9d6aeca171a633bc Mon Sep 17 00:00:00 2001 From: SionoiS Date: Wed, 8 Jan 2025 17:01:49 -0500 Subject: [PATCH 2/5] transfer test --- tests/waku_store_sync/test_protocol.nim | 113 +++++++++++++++++++++++- 1 file changed, 112 insertions(+), 1 deletion(-) diff --git a/tests/waku_store_sync/test_protocol.nim b/tests/waku_store_sync/test_protocol.nim index 8b3d820d13..2703f74537 100644 --- a/tests/waku_store_sync/test_protocol.nim +++ b/tests/waku_store_sync/test_protocol.nim @@ -17,9 +17,10 @@ import waku_store_sync/storage/range_processing, ], ../testlib/[wakucore, testasync], + ../waku_archive/archive_utils, ./sync_utils -suite "Waku Sync: 2 nodes recon": +suite "Waku Sync: reconciliation": var serverSwitch {.threadvar.}: Switch var clientSwitch {.threadvar.}: Switch @@ -230,3 +231,113 @@ suite "Waku Sync: 2 nodes recon": check: wantsChannel.len > 900 needsChannel.len > 900 + +suite "Waku Sync: transfer": + var + serverSwitch {.threadvar.}: Switch + clientSwitch {.threadvar.}: Switch + + var + serverArchive {.threadvar.}: WakuArchive + clientArchive {.threadvar.}: WakuArchive + + var + serverIds {.threadvar.}: AsyncQueue[ID] + serverWants {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + serverNeeds {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + clientIds {.threadvar.}: AsyncQueue[ID] + clientWants {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + clientNeeds {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + + var + server {.threadvar.}: SyncTransfer + client {.threadvar.}: SyncTransfer + + var + serverPeerInfo {.threadvar.}: RemotePeerInfo + clientPeerInfo {.threadvar.}: RemotePeerInfo + + asyncSetup: + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let serverDriver = newSqliteArchiveDriver() + let clientDriver = newSqliteArchiveDriver() + + serverArchive = newWakuArchive(serverDriver) + clientArchive = newWakuArchive(clientDriver) + + let + serverPeerManager = PeerManager.new(serverSwitch) + clientPeerManager = PeerManager.new(clientSwitch) + + serverIds = newAsyncQueue[ID]() + serverWants = newAsyncQueue[(PeerId, Fingerprint)]() + serverNeeds = newAsyncQueue[(PeerId, Fingerprint)]() + + server = SyncTransfer.new( + peerManager = serverPeerManager, + wakuArchive = serverArchive, + idsTx = serverIds, + wantsRx = serverWants, + needsRx = serverNeeds, + ) + + clientIds = newAsyncQueue[ID]() + clientWants = newAsyncQueue[(PeerId, Fingerprint)]() + clientNeeds = newAsyncQueue[(PeerId, Fingerprint)]() + + client = SyncTransfer.new( + peerManager = clientPeerManager, + wakuArchive = clientArchive, + idsTx = clientIds, + wantsRx = clientWants, + needsRx = clientNeeds, + ) + + server.start() + client.start() + + serverSwitch.mount(server) + clientSwitch.mount(client) + + serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() + clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo() + + asyncTeardown: + await sleepAsync(10.milliseconds) + + await allFutures(server.stopWait(), client.stopWait()) + await allFutures(serverSwitch.stop(), clientSwitch.stop()) + + asyncTest "transfer 1 message": + let msg = fakeWakuMessage() + let hash = computeMessageHash(DefaultPubsubTopic, msg) + let msgs = @[msg] + + serverArchive.put(DefaultPubsubTopic, msgs) + + # add server info and msg hash to client want channel + let want = (serverPeerInfo.peerId, hash) + clientWants.add(want) + + # add client info and msg hash to server need channel + let need = (clientPeerInfo.peerId, hash) + serverNeeds.add(need) + + # give time for transfer to happen + await sleepAsync(10.miliseconds) + + var query = ArchiveQuery() + query.includeData = true + query.hashes = @[hash] + + let res = await clientArchive.findMessages(query) + assert res.isOk(), $res.error + + let recvMsg = response.messages[0] + + check: + msg == recvMsg From 925f40cf0376b19360dfe93d44408316da124b9f Mon Sep 17 00:00:00 2001 From: SionoiS Date: Fri, 10 Jan 2025 11:09:08 -0500 Subject: [PATCH 3/5] fixes --- tests/waku_store_sync/test_protocol.nim | 114 +++++++++++++-------- waku/waku_store_sync/protocols_metrics.nim | 13 ++- waku/waku_store_sync/reconciliation.nim | 24 +++-- waku/waku_store_sync/transfer.nim | 8 +- 4 files changed, 98 insertions(+), 61 deletions(-) diff --git a/tests/waku_store_sync/test_protocol.nim b/tests/waku_store_sync/test_protocol.nim index 2703f74537..f1939438f1 100644 --- a/tests/waku_store_sync/test_protocol.nim +++ b/tests/waku_store_sync/test_protocol.nim @@ -26,8 +26,8 @@ suite "Waku Sync: reconciliation": var idsChannel {.threadvar.}: AsyncQueue[ID] - wantsChannel {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] - needsChannel {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + localWants {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + remoteNeeds {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] var server {.threadvar.}: SyncReconciliation var client {.threadvar.}: SyncReconciliation @@ -42,31 +42,32 @@ suite "Waku Sync: reconciliation": await allFutures(serverSwitch.start(), clientSwitch.start()) idsChannel = newAsyncQueue[ID]() - wantsChannel = newAsyncQueue[(PeerId, Fingerprint)]() - needsChannel = newAsyncQueue[(PeerId, Fingerprint)]() + localWants = newAsyncQueue[(PeerId, Fingerprint)]() + remoteNeeds = newAsyncQueue[(PeerId, Fingerprint)]() - server = - await newTestWakuRecon(serverSwitch, idsChannel, wantsChannel, needsChannel) - client = - await newTestWakuRecon(clientSwitch, idsChannel, wantsChannel, needsChannel) + server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) + client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo() asyncTeardown: - await sleepAsync(10.milliseconds) - await allFutures(server.stop(), client.stop()) await allFutures(serverSwitch.stop(), clientSwitch.stop()) asyncTest "sync 2 nodes both empty": + check: + idsChannel.len == 0 + localWants.len == 0 + remoteNeeds.len == 0 + let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), res.error check: idsChannel.len == 0 - wantsChannel.len == 0 - needsChannel.len == 0 + localWants.len == 0 + remoteNeeds.len == 0 asyncTest "sync 2 nodes empty client full server": let @@ -81,13 +82,18 @@ suite "Waku Sync: reconciliation": server.messageIngress(hash2, msg2) server.messageIngress(hash3, msg3) + check: + remoteNeeds.contains((clientPeerInfo.peerId, hash1)) == false + remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false + remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == false + let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), res.error check: - needsChannel.contains((clientPeerInfo.peerId, hash1)) == true - needsChannel.contains((clientPeerInfo.peerId, hash2)) == true - needsChannel.contains((clientPeerInfo.peerId, hash3)) == true + remoteNeeds.contains((clientPeerInfo.peerId, hash1)) == true + remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true + remoteNeeds.contains((clientPeerInfo.peerId, hash3)) == true asyncTest "sync 2 nodes full client empty server": let @@ -102,13 +108,18 @@ suite "Waku Sync: reconciliation": client.messageIngress(hash2, msg2) client.messageIngress(hash3, msg3) + check: + remoteNeeds.contains((serverPeerInfo.peerId, hash1)) == false + remoteNeeds.contains((serverPeerInfo.peerId, hash2)) == false + remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false + let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), res.error check: - needsChannel.contains((serverPeerInfo.peerId, hash1)) == true - needsChannel.contains((serverPeerInfo.peerId, hash2)) == true - needsChannel.contains((serverPeerInfo.peerId, hash3)) == true + remoteNeeds.contains((serverPeerInfo.peerId, hash1)) == true + remoteNeeds.contains((serverPeerInfo.peerId, hash2)) == true + remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true asyncTest "sync 2 nodes different hashes": let @@ -124,14 +135,20 @@ suite "Waku Sync: reconciliation": client.messageIngress(hash1, msg1) client.messageIngress(hash3, msg3) + check: + remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == false + remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == false + localWants.contains((clientPeerInfo.peerId, hash3)) == false + localWants.contains((serverPeerInfo.peerId, hash2)) == false + var syncRes = await client.storeSynchronization(some(serverPeerInfo)) assert syncRes.isOk(), $syncRes.error check: - needsChannel.contains((serverPeerInfo.peerId, hash3)) - needsChannel.contains((clientPeerInfo.peerId, hash2)) - wantsChannel.contains((clientPeerInfo.peerId, hash3)) - wantsChannel.contains((serverPeerInfo.peerId, hash2)) + remoteNeeds.contains((serverPeerInfo.peerId, hash3)) == true + remoteNeeds.contains((clientPeerInfo.peerId, hash2)) == true + localWants.contains((clientPeerInfo.peerId, hash3)) == true + localWants.contains((serverPeerInfo.peerId, hash2)) == true asyncTest "sync 2 nodes same hashes": let @@ -145,12 +162,16 @@ suite "Waku Sync: reconciliation": server.messageIngress(hash2, msg2) client.messageIngress(hash2, msg2) + check: + localWants.len == 0 + remoteNeeds.len == 0 + let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), $res.error check: - wantsChannel.len == 0 - needsChannel.len == 0 + localWants.len == 0 + remoteNeeds.len == 0 asyncTest "sync 2 nodes 100K msgs 1 diff": let msgCount = 100_000 @@ -176,14 +197,17 @@ suite "Waku Sync: reconciliation": diff = hash timestamp += Timestamp(part) - continue + + check: + localWants.contains((serverPeerInfo.peerId, Fingerprint(diff))) == false + remoteNeeds.contains((clientPeerInfo.peerId, Fingerprint(diff))) == false let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), $res.error check: - wantsChannel.contains((serverPeerInfo.peerId, Fingerprint(diff))) == true - needsChannel.contains((clientPeerInfo.peerId, Fingerprint(diff))) == true + localWants.contains((serverPeerInfo.peerId, Fingerprint(diff))) == true + remoteNeeds.contains((clientPeerInfo.peerId, Fingerprint(diff))) == true asyncTest "sync 2 nodes 10K msgs 1K diffs": let msgCount = 10_000 @@ -224,13 +248,17 @@ suite "Waku Sync: reconciliation": timestamp += Timestamp(part) continue + check: + localWants.len == 0 + remoteNeeds.len == 0 + let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), $res.error # timimg issue make it hard to match exact numbers check: - wantsChannel.len > 900 - needsChannel.len > 900 + localWants.len > 900 + remoteNeeds.len > 900 suite "Waku Sync: transfer": var @@ -243,11 +271,11 @@ suite "Waku Sync: transfer": var serverIds {.threadvar.}: AsyncQueue[ID] - serverWants {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] - serverNeeds {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + serverLocalWants {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + serverRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] clientIds {.threadvar.}: AsyncQueue[ID] - clientWants {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] - clientNeeds {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + clientLocalWants {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + clientRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] var server {.threadvar.}: SyncTransfer @@ -274,27 +302,27 @@ suite "Waku Sync: transfer": clientPeerManager = PeerManager.new(clientSwitch) serverIds = newAsyncQueue[ID]() - serverWants = newAsyncQueue[(PeerId, Fingerprint)]() - serverNeeds = newAsyncQueue[(PeerId, Fingerprint)]() + serverLocalWants = newAsyncQueue[(PeerId, Fingerprint)]() + serverRemoteNeeds = newAsyncQueue[(PeerId, Fingerprint)]() server = SyncTransfer.new( peerManager = serverPeerManager, wakuArchive = serverArchive, idsTx = serverIds, - wantsRx = serverWants, - needsRx = serverNeeds, + wantsRx = serverLocalWants, + needsRx = serverRemoteNeeds, ) clientIds = newAsyncQueue[ID]() - clientWants = newAsyncQueue[(PeerId, Fingerprint)]() - clientNeeds = newAsyncQueue[(PeerId, Fingerprint)]() + clientLocalWants = newAsyncQueue[(PeerId, Fingerprint)]() + clientRemoteNeeds = newAsyncQueue[(PeerId, Fingerprint)]() client = SyncTransfer.new( peerManager = clientPeerManager, wakuArchive = clientArchive, idsTx = clientIds, - wantsRx = clientWants, - needsRx = clientNeeds, + wantsRx = clientLocalWants, + needsRx = clientRemoteNeeds, ) server.start() @@ -321,11 +349,11 @@ suite "Waku Sync: transfer": # add server info and msg hash to client want channel let want = (serverPeerInfo.peerId, hash) - clientWants.add(want) + clientLocalWants.add(want) # add client info and msg hash to server need channel let need = (clientPeerInfo.peerId, hash) - serverNeeds.add(need) + serverRemoteNeeds.add(need) # give time for transfer to happen await sleepAsync(10.miliseconds) diff --git a/waku/waku_store_sync/protocols_metrics.nim b/waku/waku_store_sync/protocols_metrics.nim index 940e5835cd..64909637d8 100644 --- a/waku/waku_store_sync/protocols_metrics.nim +++ b/waku/waku_store_sync/protocols_metrics.nim @@ -1,12 +1,19 @@ import metrics +type ProtoDirection* {.pure.} = enum + ReconRecv = "reconciliation_recv" + ReconSend = "reconciliation_sent" + TransfRecv = "transfer_recv" + TransfSend = "transfer_sent" + declarePublicHistogram reconciliation_roundtrips, "the nubmer of roundtrips for each reconciliation", - buckets = [0.0, 1.0, 2.0, 3.0, 5.0, 10.0] + buckets = [0.0, 1.0, 2.0, 3.0, 5.0, 10.0, Inf] declarePublicSummary total_bytes_exchanged, "the number of bytes sent and received by the protocols", ["transfer_sent", "transfer_recv", "reconciliation_sent", "reconciliation_recv"] -declarePublicCounter total_messages_exchanged, - "the number of messages sent and received by the transfer protocol", ["sent", "recv"] +declarePublicCounter total_transfer_messages_exchanged, + "the number of messages sent and received by the transfer protocol", + ["transfer_sent", "transfer_recv"] diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index d86c3cd619..4e2a7237b1 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -30,6 +30,8 @@ import logScope: topics = "waku reconciliation" +const DefaultStorageCap = 50_000 + type SyncReconciliation* = ref object of LPProtocol peerManager: PeerManager @@ -64,7 +66,7 @@ proc messageIngress*( let id = ID(time: msg.timestamp, fingerprint: msgHash) self.storage.insert(id).isOkOr: - error "failed to insert new message", hash = msgHash.toHex(), err = error + error "failed to insert new message", msg_hash = msgHash.toHex(), err = error proc messageIngress*( self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage @@ -72,11 +74,11 @@ proc messageIngress*( let id = ID(time: msg.timestamp, fingerprint: msgHash) self.storage.insert(id).isOkOr: - error "failed to insert new message", hash = msgHash.toHex(), err = error + error "failed to insert new message", msg_hash = msgHash.toHex(), err = error proc messageIngress*(self: SyncReconciliation, id: ID) = self.storage.insert(id).isOkOr: - error "failed to insert new message", hash = id.fingerprint.toHex(), err = error + error "failed to insert new message", msg_hash = id.fingerprint.toHex(), err = error proc processRequest( self: SyncReconciliation, conn: Connection @@ -85,12 +87,12 @@ proc processRequest( while true: let readRes = catch: - await conn.readLp(-1) + await conn.readLp(int.high) let buffer: seq[byte] = readRes.valueOr: return err("connection read error: " & error.msg) - total_bytes_exchanged.observe(buffer.len, labelValues = ["reconciliation_recv"]) + total_bytes_exchanged.observe(buffer.len, labelValues = [ReconRecv]) let recvPayload = SyncPayload.deltaDecode(buffer) @@ -119,7 +121,7 @@ proc processRequest( let rawPayload = sendPayload.deltaEncode() - total_bytes_exchanged.observe(rawPayload.len, labelValues = ["reconciliation_sent"]) + total_bytes_exchanged.observe(rawPayload.len, labelValues = [ReconSend]) let writeRes = catch: await conn.writeLP(rawPayload) @@ -160,7 +162,7 @@ proc initiate( let sendPayload = initPayload.deltaEncode() - total_bytes_exchanged.observe(sendPayload.len, labelValues = ["reconciliation_sent"]) + total_bytes_exchanged.observe(sendPayload.len, labelValues = [ReconSend]) let writeRes = catch: await connection.writeLP(sendPayload) @@ -225,7 +227,7 @@ proc initFillStorage( debug "initial storage filling started" - var ids = newSeq[ID](50_000) + var ids = newSeq[ID](DefaultStorageCap) # we assume IDs are in order @@ -263,7 +265,7 @@ proc new*( let storage = if res.isErr(): warn "will not sync messages before this point in time", error = res.error - SeqStorage.new(50_000) + SeqStorage.new(DefaultStorageCap) else: SeqStorage.new(res.get()) @@ -308,7 +310,7 @@ proc periodicSync(self: SyncReconciliation) {.async.} = proc periodicPrune(self: SyncReconciliation) {.async.} = debug "periodic prune initialized", interval = $self.syncInterval - # to stagger the sync/prune intervals + # preventing sync and prune loops of happening at the same time. await sleepAsync((self.syncInterval div 2)) while true: # infinite loop @@ -320,7 +322,7 @@ proc periodicPrune(self: SyncReconciliation) {.async.} = let count = self.storage.prune(time) - debug "periodic prune done", elementsTrimmed = count + debug "periodic prune done", elements_pruned = count proc idsReceiverLoop(self: SyncReconciliation) {.async.} = while true: # infinite loop diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim index 9ae5a774ed..0e5f0332f6 100644 --- a/waku/waku_store_sync/transfer.nim +++ b/waku/waku_store_sync/transfer.nim @@ -48,7 +48,7 @@ proc sendMessage( ): Future[Result[void, string]] {.async.} = let rawPayload = payload.encode().buffer - total_bytes_exchanged.observe(rawPayload.len, labelValues = ["transfer_sent"]) + total_bytes_exchanged.observe(rawPayload.len, labelValues = [TransfSend]) let writeRes = catch: await conn.writeLP(rawPayload) @@ -56,7 +56,7 @@ proc sendMessage( if writeRes.isErr(): return err("connection write error: " & writeRes.error.msg) - total_messages_exchanged.inc(labelValues = ["sent"]) + total_transfer_messages_exchanged.inc(labelValues = [TransfSend]) return ok() @@ -134,13 +134,13 @@ proc initProtocolHandler(self: SyncTransfer) = # connection closed normally break - total_bytes_exchanged.observe(buffer.len, labelValues = ["transfer_recv"]) + total_bytes_exchanged.observe(buffer.len, labelValues = ["TransfRecv"]) let payload = WakuMessagePayload.decode(buffer).valueOr: error "decoding error", error = $error continue - total_messages_exchanged.inc(labelValues = ["recv"]) + total_transfer_messages_exchanged.inc(labelValues = [TransfRecv]) let msg = payload.message let pubsub = payload.pubsub From 13143726a6be4baff56e6a48390dcdc13a26563b Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 16 Jan 2025 09:21:53 -0500 Subject: [PATCH 4/5] renaming & typo --- waku/waku_store_sync/reconciliation.nim | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index 4e2a7237b1..18ff211aaa 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -43,13 +43,13 @@ type SyncReconciliation* = ref object of LPProtocol idsRx: AsyncQueue[ID] # Send Hashes to transfer protocol for reception - wantsTx: AsyncQueue[(PeerId, Fingerprint)] + localWantsTx: AsyncQueue[(PeerId, Fingerprint)] # Send Hashes to transfer protocol for transmission - needsTx: AsyncQueue[(PeerId, Fingerprint)] + remoteNeedsTx: AsyncQueue[(PeerId, Fingerprint)] # params - syncInterval: timer.Duration # Time between each syncronisation attempt + syncInterval: timer.Duration # Time between each synchronization attempt syncRange: timer.Duration # Amount of time in the past to sync relayJitter: Duration # Amount of time since the present to ignore when syncing @@ -114,7 +114,7 @@ proc processRequest( let sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv) for hash in hashToSend: - await self.needsTx.addLast((conn.peerId, hash)) + await self.remoteNeedsTx.addLast((conn.peerId, hash)) for hash in hashToRecv: await self.wantstx.addLast((conn.peerId, hash)) @@ -258,8 +258,8 @@ proc new*( syncInterval: timer.Duration = DefaultSyncInterval, relayJitter: timer.Duration = DefaultGossipSubJitter, idsRx: AsyncQueue[ID], - wantsTx: AsyncQueue[(PeerId, Fingerprint)], - needsTx: AsyncQueue[(PeerId, Fingerprint)], + localWantsTx: AsyncQueue[(PeerId, Fingerprint)], + remoteNeedsTx: AsyncQueue[(PeerId, Fingerprint)], ): Future[Result[T, string]] {.async.} = let res = await initFillStorage(syncRange, wakuArchive) let storage = @@ -276,8 +276,8 @@ proc new*( syncInterval: syncInterval, relayJitter: relayJitter, idsRx: idsRx, - wantsTx: wantsTx, - needsTx: needsTx, + localWantsTx: localWantsTx, + remoteNeedsTx: remoteNeedsTx, ) let handler = proc(conn: Connection, proto: string) {.async, closure.} = From 000672d774edd072a80a889df182867d4f8816ec Mon Sep 17 00:00:00 2001 From: SionoiS Date: Thu, 23 Jan 2025 15:37:06 -0500 Subject: [PATCH 5/5] renaming & fixes --- tests/waku_store_sync/sync_utils.nim | 26 ++++--- tests/waku_store_sync/test_protocol.nim | 82 ++++++++++++---------- waku/waku_archive/archive.nim | 6 +- waku/waku_store_sync/codec.nim | 2 +- waku/waku_store_sync/protocols_metrics.nim | 16 ++--- waku/waku_store_sync/reconciliation.nim | 75 +++++++++++--------- waku/waku_store_sync/transfer.nim | 59 +++++++++------- 7 files changed, 147 insertions(+), 119 deletions(-) diff --git a/tests/waku_store_sync/sync_utils.nim b/tests/waku_store_sync/sync_utils.nim index 6faaee2943..aa56ff2e51 100644 --- a/tests/waku_store_sync/sync_utils.nim +++ b/tests/waku_store_sync/sync_utils.nim @@ -1,6 +1,14 @@ import std/[options, random], chronos, chronicles -import waku/[node/peer_manager, waku_core, waku_store_sync/common], ../testlib/wakucore +import + waku/[ + node/peer_manager, + waku_core, + waku_store_sync/common, + waku_store_sync/reconciliation, + waku_store_sync/transfer, + ], + ../testlib/wakucore randomize() @@ -12,7 +20,7 @@ proc randomHash*(rng: var Rand): WakuMessageHash = return hash -#[ proc newTestWakuRecon*( +proc newTestWakuRecon*( switch: Switch, idsRx: AsyncQueue[SyncID], wantsTx: AsyncQueue[(PeerId, Fingerprint)], @@ -25,8 +33,8 @@ proc randomHash*(rng: var Rand): WakuMessageHash = wakuArchive = nil, relayJitter = 0.seconds, idsRx = idsRx, - wantsTx = wantsTx, - needsTx = needsTx, + localWantsTx = wantsTx, + remoteNeedsTx = needsTx, ) let proto = res.get() @@ -34,9 +42,9 @@ proc randomHash*(rng: var Rand): WakuMessageHash = proto.start() switch.mount(proto) - return proto ]# + return proto -#[ proc newTestWakuTransfer*( +proc newTestWakuTransfer*( switch: Switch, idsTx: AsyncQueue[SyncID], wantsRx: AsyncQueue[(PeerId, Fingerprint)], @@ -48,11 +56,11 @@ proc randomHash*(rng: var Rand): WakuMessageHash = peerManager = peerManager, wakuArchive = nil, idsTx = idsTx, - wantsRx = wantsRx, - needsRx = needsRx, + localWantsRx = wantsRx, + remoteNeedsRx = needsRx, ) proto.start() switch.mount(proto) - return proto ]# + return proto diff --git a/tests/waku_store_sync/test_protocol.nim b/tests/waku_store_sync/test_protocol.nim index f1939438f1..7ca783152f 100644 --- a/tests/waku_store_sync/test_protocol.nim +++ b/tests/waku_store_sync/test_protocol.nim @@ -13,8 +13,13 @@ import waku_core, waku_core/message, waku_core/message/digest, - waku_store_sync, + waku_store_sync/common, waku_store_sync/storage/range_processing, + waku_store_sync/reconciliation, + waku_store_sync/transfer, + waku_archive/archive, + waku_archive/driver, + waku_archive/common, ], ../testlib/[wakucore, testasync], ../waku_archive/archive_utils, @@ -25,9 +30,9 @@ suite "Waku Sync: reconciliation": var clientSwitch {.threadvar.}: Switch var - idsChannel {.threadvar.}: AsyncQueue[ID] - localWants {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] - remoteNeeds {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + idsChannel {.threadvar.}: AsyncQueue[SyncID] + localWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + remoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] var server {.threadvar.}: SyncReconciliation var client {.threadvar.}: SyncReconciliation @@ -41,9 +46,9 @@ suite "Waku Sync: reconciliation": await allFutures(serverSwitch.start(), clientSwitch.start()) - idsChannel = newAsyncQueue[ID]() - localWants = newAsyncQueue[(PeerId, Fingerprint)]() - remoteNeeds = newAsyncQueue[(PeerId, Fingerprint)]() + idsChannel = newAsyncQueue[SyncID]() + localWants = newAsyncQueue[(PeerId, WakuMessageHash)]() + remoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() server = await newTestWakuRecon(serverSwitch, idsChannel, localWants, remoteNeeds) client = await newTestWakuRecon(clientSwitch, idsChannel, localWants, remoteNeeds) @@ -199,15 +204,15 @@ suite "Waku Sync: reconciliation": timestamp += Timestamp(part) check: - localWants.contains((serverPeerInfo.peerId, Fingerprint(diff))) == false - remoteNeeds.contains((clientPeerInfo.peerId, Fingerprint(diff))) == false + localWants.contains((serverPeerInfo.peerId, WakuMessageHash(diff))) == false + remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == false let res = await client.storeSynchronization(some(serverPeerInfo)) assert res.isOk(), $res.error check: - localWants.contains((serverPeerInfo.peerId, Fingerprint(diff))) == true - remoteNeeds.contains((clientPeerInfo.peerId, Fingerprint(diff))) == true + localWants.contains((serverPeerInfo.peerId, WakuMessageHash(diff))) == true + remoteNeeds.contains((clientPeerInfo.peerId, WakuMessageHash(diff))) == true asyncTest "sync 2 nodes 10K msgs 1K diffs": let msgCount = 10_000 @@ -266,16 +271,18 @@ suite "Waku Sync: transfer": clientSwitch {.threadvar.}: Switch var + serverDriver {.threadvar.}: ArchiveDriver + clientDriver {.threadvar.}: ArchiveDriver serverArchive {.threadvar.}: WakuArchive clientArchive {.threadvar.}: WakuArchive var - serverIds {.threadvar.}: AsyncQueue[ID] - serverLocalWants {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] - serverRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] - clientIds {.threadvar.}: AsyncQueue[ID] - clientLocalWants {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] - clientRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, Fingerprint)] + serverIds {.threadvar.}: AsyncQueue[SyncID] + serverLocalWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + serverRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + clientIds {.threadvar.}: AsyncQueue[SyncID] + clientLocalWants {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] + clientRemoteNeeds {.threadvar.}: AsyncQueue[(PeerId, WakuMessageHash)] var server {.threadvar.}: SyncTransfer @@ -291,8 +298,8 @@ suite "Waku Sync: transfer": await allFutures(serverSwitch.start(), clientSwitch.start()) - let serverDriver = newSqliteArchiveDriver() - let clientDriver = newSqliteArchiveDriver() + serverDriver = newSqliteArchiveDriver() + clientDriver = newSqliteArchiveDriver() serverArchive = newWakuArchive(serverDriver) clientArchive = newWakuArchive(clientDriver) @@ -301,28 +308,28 @@ suite "Waku Sync: transfer": serverPeerManager = PeerManager.new(serverSwitch) clientPeerManager = PeerManager.new(clientSwitch) - serverIds = newAsyncQueue[ID]() - serverLocalWants = newAsyncQueue[(PeerId, Fingerprint)]() - serverRemoteNeeds = newAsyncQueue[(PeerId, Fingerprint)]() + serverIds = newAsyncQueue[SyncID]() + serverLocalWants = newAsyncQueue[(PeerId, WakuMessageHash)]() + serverRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() server = SyncTransfer.new( peerManager = serverPeerManager, wakuArchive = serverArchive, idsTx = serverIds, - wantsRx = serverLocalWants, - needsRx = serverRemoteNeeds, + localWantsRx = serverLocalWants, + remoteNeedsRx = serverRemoteNeeds, ) - clientIds = newAsyncQueue[ID]() - clientLocalWants = newAsyncQueue[(PeerId, Fingerprint)]() - clientRemoteNeeds = newAsyncQueue[(PeerId, Fingerprint)]() + clientIds = newAsyncQueue[SyncID]() + clientLocalWants = newAsyncQueue[(PeerId, WakuMessageHash)]() + clientRemoteNeeds = newAsyncQueue[(PeerId, WakuMessageHash)]() client = SyncTransfer.new( peerManager = clientPeerManager, wakuArchive = clientArchive, idsTx = clientIds, - wantsRx = clientLocalWants, - needsRx = clientRemoteNeeds, + localWantsRx = clientLocalWants, + remoteNeedsRx = clientRemoteNeeds, ) server.start() @@ -334,9 +341,10 @@ suite "Waku Sync: transfer": serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() clientPeerInfo = clientSwitch.peerInfo.toRemotePeerInfo() - asyncTeardown: - await sleepAsync(10.milliseconds) + serverPeerManager.addPeer(clientPeerInfo) + clientPeermanager.addPeer(serverPeerInfo) + asyncTeardown: await allFutures(server.stopWait(), client.stopWait()) await allFutures(serverSwitch.stop(), clientSwitch.stop()) @@ -345,18 +353,18 @@ suite "Waku Sync: transfer": let hash = computeMessageHash(DefaultPubsubTopic, msg) let msgs = @[msg] - serverArchive.put(DefaultPubsubTopic, msgs) + serverDriver = serverDriver.put(DefaultPubsubTopic, msgs) # add server info and msg hash to client want channel let want = (serverPeerInfo.peerId, hash) - clientLocalWants.add(want) + await clientLocalWants.put(want) # add client info and msg hash to server need channel let need = (clientPeerInfo.peerId, hash) - serverRemoteNeeds.add(need) + await serverRemoteNeeds.put(need) # give time for transfer to happen - await sleepAsync(10.miliseconds) + await sleepAsync(250.milliseconds) var query = ArchiveQuery() query.includeData = true @@ -365,7 +373,5 @@ suite "Waku Sync: transfer": let res = await clientArchive.findMessages(query) assert res.isOk(), $res.error - let recvMsg = response.messages[0] - check: - msg == recvMsg + msg == res.get().messages[0] diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 914c7366db..b1f30d6bd3 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -122,7 +122,7 @@ proc syncMessageIngress*( msgHash: WakuMessageHash, pubsubTopic: PubsubTopic, msg: WakuMessage, -) {.async.} = +): Future[Result[void, string]] {.async.} = let insertStartTime = getTime().toUnixFloat() (await self.driver.put(msgHash, pubsubTopic, msg)).isOkOr: @@ -133,7 +133,7 @@ proc syncMessageIngress*( contentTopic = msg.contentTopic, timestamp = msg.timestamp, error = error - return + return err("failed to insert message") trace "message archived", msg_hash = msgHash.to0xHex(), @@ -144,6 +144,8 @@ proc syncMessageIngress*( let insertDuration = getTime().toUnixFloat() - insertStartTime waku_archive_insert_duration_seconds.observe(insertDuration) + return ok() + proc findMessages*( self: WakuArchive, query: ArchiveQuery ): Future[ArchiveResult] {.async, gcsafe.} = diff --git a/waku/waku_store_sync/codec.nim b/waku/waku_store_sync/codec.nim index d91cca16cd..ee0b926a3a 100644 --- a/waku/waku_store_sync/codec.nim +++ b/waku/waku_store_sync/codec.nim @@ -243,7 +243,7 @@ proc getItemSet( proc deltaDecode*(T: type RangesData, buffer: seq[byte]): Result[T, string] = if buffer.len == 1: - return err("payload too small") + return ok(RangesData()) var payload = RangesData() diff --git a/waku/waku_store_sync/protocols_metrics.nim b/waku/waku_store_sync/protocols_metrics.nim index 64909637d8..2d2776674e 100644 --- a/waku/waku_store_sync/protocols_metrics.nim +++ b/waku/waku_store_sync/protocols_metrics.nim @@ -1,19 +1,17 @@ import metrics -type ProtoDirection* {.pure.} = enum - ReconRecv = "reconciliation_recv" - ReconSend = "reconciliation_sent" - TransfRecv = "transfer_recv" - TransfSend = "transfer_sent" +const + Reconciliation* = "reconciliation" + Transfer* = "transfer" + Receiving* = "receive" + Sending* = "sent" declarePublicHistogram reconciliation_roundtrips, "the nubmer of roundtrips for each reconciliation", buckets = [0.0, 1.0, 2.0, 3.0, 5.0, 10.0, Inf] declarePublicSummary total_bytes_exchanged, - "the number of bytes sent and received by the protocols", - ["transfer_sent", "transfer_recv", "reconciliation_sent", "reconciliation_recv"] + "the number of bytes sent and received by the protocols", ["protocol", "direction"] declarePublicCounter total_transfer_messages_exchanged, - "the number of messages sent and received by the transfer protocol", - ["transfer_sent", "transfer_recv"] + "the number of messages sent and received by the transfer protocol", ["direction"] diff --git a/waku/waku_store_sync/reconciliation.nim b/waku/waku_store_sync/reconciliation.nim index 18ff211aaa..d2ade2ab39 100644 --- a/waku/waku_store_sync/reconciliation.nim +++ b/waku/waku_store_sync/reconciliation.nim @@ -17,7 +17,11 @@ import ../common/protobuf, ../common/paging, ../waku_enr, - ../waku_core, + ../waku_core/codecs, + ../waku_core/time, + ../waku_core/topics/pubsub_topic, + ../waku_core/message/digest, + ../waku_core/message/message, ../node/peer_manager/peer_manager, ../waku_archive, ./common, @@ -40,13 +44,13 @@ type SyncReconciliation* = ref object of LPProtocol storage: SyncStorage # Receive IDs from transfer protocol for storage - idsRx: AsyncQueue[ID] + idsRx: AsyncQueue[SyncID] # Send Hashes to transfer protocol for reception - localWantsTx: AsyncQueue[(PeerId, Fingerprint)] + localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)] # Send Hashes to transfer protocol for transmission - remoteNeedsTx: AsyncQueue[(PeerId, Fingerprint)] + remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)] # params syncInterval: timer.Duration # Time between each synchronization attempt @@ -63,7 +67,7 @@ proc messageIngress*( ) = let msgHash = computeMessageHash(pubsubTopic, msg) - let id = ID(time: msg.timestamp, fingerprint: msgHash) + let id = SyncID(time: msg.timestamp, hash: msgHash) self.storage.insert(id).isOkOr: error "failed to insert new message", msg_hash = msgHash.toHex(), err = error @@ -71,14 +75,14 @@ proc messageIngress*( proc messageIngress*( self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage ) = - let id = ID(time: msg.timestamp, fingerprint: msgHash) + let id = SyncID(time: msg.timestamp, hash: msgHash) self.storage.insert(id).isOkOr: error "failed to insert new message", msg_hash = msgHash.toHex(), err = error -proc messageIngress*(self: SyncReconciliation, id: ID) = +proc messageIngress*(self: SyncReconciliation, id: SyncID) = self.storage.insert(id).isOkOr: - error "failed to insert new message", msg_hash = id.fingerprint.toHex(), err = error + error "failed to insert new message", msg_hash = id.hash.toHex(), err = error proc processRequest( self: SyncReconciliation, conn: Connection @@ -92,9 +96,10 @@ proc processRequest( let buffer: seq[byte] = readRes.valueOr: return err("connection read error: " & error.msg) - total_bytes_exchanged.observe(buffer.len, labelValues = [ReconRecv]) + total_bytes_exchanged.observe(buffer.len, labelValues = [Reconciliation, Receiving]) - let recvPayload = SyncPayload.deltaDecode(buffer) + let recvPayload = RangesData.deltaDecode(buffer).valueOr: + return err("payload decoding error: " & error) roundTrips.inc() @@ -103,13 +108,12 @@ proc processRequest( remote = conn.peerId, payload = recvPayload - if recvPayload.ranges.len == 0 or - recvPayload.ranges.allIt(it[1] == RangeType.skipRange): + if recvPayload.ranges.len == 0 or recvPayload.ranges.allIt(it[1] == RangeType.Skip): break var - hashToRecv: seq[Fingerprint] - hashToSend: seq[Fingerprint] + hashToRecv: seq[WakuMessageHash] + hashToSend: seq[WakuMessageHash] let sendPayload = self.storage.processPayload(recvPayload, hashToSend, hashToRecv) @@ -117,11 +121,13 @@ proc processRequest( await self.remoteNeedsTx.addLast((conn.peerId, hash)) for hash in hashToRecv: - await self.wantstx.addLast((conn.peerId, hash)) + await self.localWantstx.addLast((conn.peerId, hash)) let rawPayload = sendPayload.deltaEncode() - total_bytes_exchanged.observe(rawPayload.len, labelValues = [ReconSend]) + total_bytes_exchanged.observe( + rawPayload.len, labelValues = [Reconciliation, Sending] + ) let writeRes = catch: await conn.writeLP(rawPayload) @@ -134,8 +140,7 @@ proc processRequest( remote = conn.peerId, payload = sendPayload - if sendPayload.ranges.len == 0 or - sendPayload.ranges.allIt(it[1] == RangeType.skipRange): + if sendPayload.ranges.len == 0 or sendPayload.ranges.allIt(it[1] == RangeType.Skip): break continue @@ -151,18 +156,22 @@ proc initiate( ): Future[Result[void, string]] {.async.} = let timeRange = calculateTimeRange(self.relayJitter, self.syncRange) - lower = ID(time: timeRange.a, fingerprint: EmptyFingerprint) - upper = ID(time: timeRange.b, fingerprint: FullFingerprint) + lower = SyncID(time: timeRange.a, hash: EmptyFingerprint) + upper = SyncID(time: timeRange.b, hash: FullFingerprint) bounds = lower .. upper - fingerprint = self.storage.fingerprinting(bounds) - initPayload = SyncPayload( - ranges: @[(bounds, fingerprintRange)], fingerprints: @[fingerprint], itemSets: @[] + fingerprint = self.storage.computeFingerprint(bounds) + initPayload = RangesData( + ranges: @[(bounds, RangeType.Fingerprint)], + fingerprints: @[fingerprint], + itemSets: @[], ) let sendPayload = initPayload.deltaEncode() - total_bytes_exchanged.observe(sendPayload.len, labelValues = [ReconSend]) + total_bytes_exchanged.observe( + sendPayload.len, labelValues = [Reconciliation, Sending] + ) let writeRes = catch: await connection.writeLP(sendPayload) @@ -183,10 +192,10 @@ proc storeSynchronization*( self: SyncReconciliation, peerInfo: Option[RemotePeerInfo] = none(RemotePeerInfo) ): Future[Result[void, string]] {.async.} = let peer = peerInfo.valueOr: - self.peerManager.selectPeer(SyncReconciliationCodec).valueOr: + self.peerManager.selectPeer(WakuReconciliationCodec).valueOr: return err("no suitable peer found for sync") - let connOpt = await self.peerManager.dialPeer(peer, SyncReconciliationCodec) + let connOpt = await self.peerManager.dialPeer(peer, WakuReconciliationCodec) let conn: Connection = connOpt.valueOr: return err("cannot establish sync connection") @@ -207,7 +216,7 @@ proc storeSynchronization*( proc initFillStorage( syncRange: timer.Duration, wakuArchive: WakuArchive -): Future[Result[seq[ID], string]] {.async.} = +): Future[Result[seq[SyncID], string]] {.async.} = if wakuArchive.isNil(): return err("waku archive unavailable") @@ -227,7 +236,7 @@ proc initFillStorage( debug "initial storage filling started" - var ids = newSeq[ID](DefaultStorageCap) + var ids = newSeq[SyncID](DefaultStorageCap) # we assume IDs are in order @@ -239,7 +248,7 @@ proc initFillStorage( let hash = response.hashes[i] let msg = response.messages[i] - ids.add(ID(time: msg.timestamp, fingerprint: hash)) + ids.add(SyncID(time: msg.timestamp, hash: hash)) if response.cursor.isNone(): break @@ -257,9 +266,9 @@ proc new*( syncRange: timer.Duration = DefaultSyncRange, syncInterval: timer.Duration = DefaultSyncInterval, relayJitter: timer.Duration = DefaultGossipSubJitter, - idsRx: AsyncQueue[ID], - localWantsTx: AsyncQueue[(PeerId, Fingerprint)], - remoteNeedsTx: AsyncQueue[(PeerId, Fingerprint)], + idsRx: AsyncQueue[SyncID], + localWantsTx: AsyncQueue[(PeerId, WakuMessageHash)], + remoteNeedsTx: AsyncQueue[(PeerId, WakuMessageHash)], ): Future[Result[T, string]] {.async.} = let res = await initFillStorage(syncRange, wakuArchive) let storage = @@ -287,7 +296,7 @@ proc new*( return sync.handler = handler - sync.codec = SyncReconciliationCodec + sync.codec = WakuReconciliationCodec info "Store Reconciliation protocol initialized" diff --git a/waku/waku_store_sync/transfer.nim b/waku/waku_store_sync/transfer.nim index 0e5f0332f6..e384f04a51 100644 --- a/waku/waku_store_sync/transfer.nim +++ b/waku/waku_store_sync/transfer.nim @@ -15,7 +15,12 @@ import ../common/nimchronos, ../common/protobuf, ../waku_enr, - ../waku_core, + ../waku_core/codecs, + ../waku_core/time, + ../waku_core/topics/pubsub_topic, + ../waku_core/message/digest, + ../waku_core/message/message, + ../waku_core/message/default_values, ../node/peer_manager/peer_manager, ../waku_archive, ../waku_archive/common, @@ -31,24 +36,24 @@ type SyncTransfer* = ref object of LPProtocol peerManager: PeerManager # Send IDs to reconciliation protocol for storage - idsTx: AsyncQueue[ID] + idsTx: AsyncQueue[SyncID] # Receive Hashes from reconciliation protocol for reception - wantsRx: AsyncQueue[(PeerId, Fingerprint)] - wantsRxFut: Future[void] + localWantsRx: AsyncQueue[(PeerId, WakuMessageHash)] + localWantsRxFut: Future[void] inSessions: Table[PeerId, HashSet[WakuMessageHash]] # Receive Hashes from reconciliation protocol for transmission - needsRx: AsyncQueue[(PeerId, Fingerprint)] - needsRxFut: Future[void] + remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)] + remoteNeedsRxFut: Future[void] outSessions: Table[PeerId, Connection] proc sendMessage( - conn: Connection, payload: WakuMessagePayload + conn: Connection, payload: WakuMessageAndTopic ): Future[Result[void, string]] {.async.} = let rawPayload = payload.encode().buffer - total_bytes_exchanged.observe(rawPayload.len, labelValues = [TransfSend]) + total_bytes_exchanged.observe(rawPayload.len, labelValues = [Transfer, Sending]) let writeRes = catch: await conn.writeLP(rawPayload) @@ -56,14 +61,14 @@ proc sendMessage( if writeRes.isErr(): return err("connection write error: " & writeRes.error.msg) - total_transfer_messages_exchanged.inc(labelValues = [TransfSend]) + total_transfer_messages_exchanged.inc(labelValues = [Sending]) return ok() proc openConnection( self: SyncTransfer, peerId: PeerId ): Future[Result[Connection, string]] {.async.} = - let connOpt = await self.peerManager.dialPeer(peerId, SyncTransferCodec) + let connOpt = await self.peerManager.dialPeer(peerId, WakuTransferCodec) let conn: Connection = connOpt.valueOr: return err("Cannot establish transfer connection") @@ -79,7 +84,7 @@ proc wantsReceiverLoop(self: SyncTransfer) {.async.} = ## "supposed to be received" while true: # infinite loop - let (peerId, fingerprint) = await self.wantsRx.popFirst() + let (peerId, fingerprint) = await self.localWantsRx.popFirst() self.inSessions.withValue(peerId, value): value[].incl(fingerprint) @@ -96,7 +101,7 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} = ## get the messages from DB and then send them. while true: # infinite loop - let (peerId, fingerprint) = await self.needsRx.popFirst() + let (peerId, fingerprint) = await self.remoteNeedsRx.popFirst() if not self.outSessions.hasKey(peerId): let connection = (await self.openConnection(peerId)).valueOr: @@ -116,7 +121,7 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} = continue let msg = - WakuMessagePayload(pubsub: response.topics[0], message: response.messages[0]) + WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0]) (await sendMessage(connection, msg)).isOkOr: error "failed to send message", error = error @@ -134,13 +139,13 @@ proc initProtocolHandler(self: SyncTransfer) = # connection closed normally break - total_bytes_exchanged.observe(buffer.len, labelValues = ["TransfRecv"]) + total_bytes_exchanged.observe(buffer.len, labelValues = [Transfer, Receiving]) - let payload = WakuMessagePayload.decode(buffer).valueOr: + let payload = WakuMessageAndTopic.decode(buffer).valueOr: error "decoding error", error = $error continue - total_transfer_messages_exchanged.inc(labelValues = [TransfRecv]) + total_transfer_messages_exchanged.inc(labelValues = [Receiving]) let msg = payload.message let pubsub = payload.pubsub @@ -164,7 +169,7 @@ proc initProtocolHandler(self: SyncTransfer) = (await self.wakuArchive.syncMessageIngress(hash, pubsub, msg)).isOkOr: continue - let id = Id(time: msg.timestamp, fingerprint: hash) + let id = SyncID(time: msg.timestamp, hash: hash) await self.idsTx.addLast(id) continue @@ -175,22 +180,22 @@ proc initProtocolHandler(self: SyncTransfer) = return self.handler = handler - self.codec = SyncTransferCodec + self.codec = WakuTransferCodec proc new*( T: type SyncTransfer, peerManager: PeerManager, wakuArchive: WakuArchive, - idsTx: AsyncQueue[ID], - wantsRx: AsyncQueue[(PeerId, Fingerprint)], - needsRx: AsyncQueue[(PeerId, Fingerprint)], + idsTx: AsyncQueue[SyncID], + localWantsRx: AsyncQueue[(PeerId, WakuMessageHash)], + remoteNeedsRx: AsyncQueue[(PeerId, WakuMessageHash)], ): T = var transfer = SyncTransfer( peerManager: peerManager, wakuArchive: wakuArchive, idsTx: idsTx, - wantsRx: wantsRx, - needsRx: needsRx, + localWantsRx: localWantsRx, + remoteNeedsRx: remoteNeedsRx, ) transfer.initProtocolHandler() @@ -205,15 +210,15 @@ proc start*(self: SyncTransfer) = self.started = true - self.wantsRxFut = self.wantsReceiverLoop() - self.needsRxFut = self.needsReceiverLoop() + self.localWantsRxFut = self.wantsReceiverLoop() + self.remoteNeedsRxFut = self.needsReceiverLoop() info "Store Sync Transfer protocol started" proc stopWait*(self: SyncTransfer) {.async.} = self.started = false - await self.wantsRxFut.cancelAndWait() - await self.needsRxFut.cancelAndWait() + await self.localWantsRxFut.cancelAndWait() + await self.remoteNeedsRxFut.cancelAndWait() info "Store Sync Transfer protocol stopped"