Skip to content

Commit

Permalink
feat: waku store sync 2.0 common types & codec (#3213)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Jan 22, 2025
1 parent 505ec84 commit 29fda2d
Show file tree
Hide file tree
Showing 6 changed files with 636 additions and 0 deletions.
58 changes: 58 additions & 0 deletions tests/waku_store_sync/sync_utils.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import std/[options, random], chronos, chronicles

import waku/[node/peer_manager, waku_core, waku_store_sync], ../testlib/wakucore

randomize()

proc randomHash*(rng: var Rand): WakuMessageHash =
var hash = EmptyWakuMessageHash

for i in 0 ..< hash.len:
hash[i] = rng.rand(uint8)

return hash

proc newTestWakuRecon*(
switch: Switch,
idsRx: AsyncQueue[SyncID],
wantsTx: AsyncQueue[(PeerId, Fingerprint)],
needsTx: AsyncQueue[(PeerId, Fingerprint)],
): Future[SyncReconciliation] {.async.} =
let peerManager = PeerManager.new(switch)

let res = await SyncReconciliation.new(
peerManager = peerManager,
wakuArchive = nil,
relayJitter = 0.seconds,
idsRx = idsRx,
wantsTx = wantsTx,
needsTx = needsTx,
)

let proto = res.get()

proto.start()
switch.mount(proto)

return proto

proc newTestWakuTransfer*(
switch: Switch,
idsTx: AsyncQueue[SyncID],
wantsRx: AsyncQueue[(PeerId, Fingerprint)],
needsRx: AsyncQueue[(PeerId, Fingerprint)],
): SyncTransfer =
let peerManager = PeerManager.new(switch)

let proto = SyncTransfer.new(
peerManager = peerManager,
wakuArchive = nil,
idsTx = idsTx,
wantsRx = wantsRx,
needsRx = needsRx,
)

proto.start()
switch.mount(proto)

return proto
206 changes: 206 additions & 0 deletions tests/waku_store_sync/test_codec.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
{.used.}

import std/[options, random], testutils/unittests, chronos

import
../../waku/waku_core,
../../waku/waku_core/message/digest,
../../waku/waku_core/time,
../../waku/waku_store_sync,
../../waku/waku_store_sync/common,
../../waku/waku_store_sync/codec,
./sync_utils

proc randomItemSet(count: int, startTime: Timestamp, rng: var Rand): ItemSet =
var
elements = newSeqOfCap[SyncID](count)
lastTime = startTime

for i in 0 ..< count:
let diff = rng.rand(9.uint8) + 1

let timestamp = lastTime + diff * 1_000_000_000
lastTime = timestamp

let hash = randomHash(rng)

let id = SyncID(time: Timestamp(timestamp), fingerprint: hash)

elements.add(id)

return ItemSet(elements: elements, reconciled: true)

proc randomSetRange(
count: int, startTime: Timestamp, rng: var Rand
): (Slice[SyncID], ItemSet) =
let itemSet = randomItemSet(count, startTime, rng)

var
lb = itemSet.elements[0]
ub = itemSet.elements[^1]

#for test check equality
lb.fingerprint = EmptyFingerprint
ub.fingerprint = EmptyFingerprint

let bounds = lb .. ub

return (bounds, itemSet)

suite "Waku Store Sync Codec":
test "empty item set encoding roundtrip":
var origItemSet = ItemSet()

origItemSet.reconciled = true

var encodedSet = origItemSet.deltaEncode()

var itemSet = ItemSet()
let _ = deltaDecode(itemSet, encodedSet, 0)

check:
origItemSet == itemSet

test "item set encoding roundtrip":
let
count = 10
time = getNowInNanosecondTime()

var rng = initRand()

let origItemSet = randomItemSet(count, time, rng)
var encodedSet = origItemSet.deltaEncode()

#faking a longer payload
let pad: seq[byte] =
@[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
encodedSet &= pad

var itemSet = ItemSet()
let _ = deltaDecode(itemSet, encodedSet, count)

check:
origItemSet == itemSet

test "payload item set encoding roundtrip":
let count = 5

var
rng = initRand()
time = getNowInNanosecondTime()

let (bounds1, itemSet1) = randomSetRange(count, time, rng)
let (bounds2, itemSet2) = randomSetRange(count, time + 10_000_000_000, rng)
let (bounds3, itemSet3) = randomSetRange(count, time + 20_000_000_000, rng)
let (bounds4, itemSet4) = randomSetRange(count, time + 30_000_000_000, rng)

let range1 = (bounds1, RangeType.ItemSet)
let range2 = (bounds2, RangeType.ItemSet)
let range3 = (bounds3, RangeType.ItemSet)
let range4 = (bounds4, RangeType.ItemSet)

let payload = RangesData(
ranges: @[range1, range2, range3, range4],
fingerprints: @[],
itemSets: @[itemSet1, itemSet2, itemSet3, itemSet4],
)

let encodedPayload = payload.deltaEncode()

let res = RangesData.deltaDecode(encodedPayload)
assert res.isOk(), $res.error

let decodedPayload = res.get()

check:
payload.ranges[0][0].b == decodedPayload.ranges[0][0].b
payload.ranges[1][0].b == decodedPayload.ranges[1][0].b
payload.ranges[2][0].b == decodedPayload.ranges[2][0].b
payload.ranges[3][0].b == decodedPayload.ranges[3][0].b
payload.itemSets == decodedPayload.itemSets

test "payload fingerprint encoding roundtrip":
let count = 4

var
rng = initRand()
lastTime = getNowInNanosecondTime()
ranges = newSeqOfCap[(Slice[SyncID], RangeType)](4)

for i in 0 ..< count:
let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint)

let nowTime = lastTime + 10_000_000_000 # 10s

lastTime = nowTime
let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint)
let bounds = lb .. ub
let range = (bounds, RangeType.Fingerprint)

ranges.add(range)

let payload = RangesData(
ranges: ranges,
fingerprints:
@[randomHash(rng), randomHash(rng), randomHash(rng), randomHash(rng)],
itemSets: @[],
)

let encodedPayload = payload.deltaEncode()

let res = RangesData.deltaDecode(encodedPayload)
assert res.isOk(), $res.error

let decodedPayload = res.get()

check:
payload.ranges[0][0].b == decodedPayload.ranges[0][0].b
payload.ranges[1][0].b == decodedPayload.ranges[1][0].b
payload.ranges[2][0].b == decodedPayload.ranges[2][0].b
payload.ranges[3][0].b == decodedPayload.ranges[3][0].b
payload.fingerprints == decodedPayload.fingerprints

test "payload mixed encoding roundtrip":
let count = 2

var
rng = initRand()
lastTime = getNowInNanosecondTime()
ranges = newSeqOfCap[(Slice[SyncID], RangeType)](4)
itemSets = newSeqOfCap[ItemSet](4)
fingerprints = newSeqOfCap[Fingerprint](4)

for i in 1 .. count:
let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint)
let nowTime = lastTime + 10_000_000_000 # 10s
lastTime = nowTime
let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint)
let bounds = lb .. ub
let range = (bounds, RangeType.Fingerprint)

ranges.add(range)
fingerprints.add(randomHash(rng))

let (bound, itemSet) = randomSetRange(5, lastTime, rng)
lastTime += 50_000_000_000 # 50s

ranges.add((bound, RangeType.ItemSet))
itemSets.add(itemSet)

let payload =
RangesData(ranges: ranges, fingerprints: fingerprints, itemSets: itemSets)

let encodedPayload = payload.deltaEncode()

let res = RangesData.deltaDecode(encodedPayload)
assert res.isOk(), $res.error

let decodedPayload = res.get()

check:
payload.ranges[0][0].b == decodedPayload.ranges[0][0].b
payload.ranges[1][0].b == decodedPayload.ranges[1][0].b
payload.ranges[2][0].b == decodedPayload.ranges[2][0].b
payload.ranges[3][0].b == decodedPayload.ranges[3][0].b
payload.fingerprints == decodedPayload.fingerprints
payload.itemSets == decodedPayload.itemSets
2 changes: 2 additions & 0 deletions waku/waku_core/codecs.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ const
WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1"
WakuLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
WakuSyncCodec* = "/vac/waku/sync/1.0.0"
WakuReconciliationCodec* = "/vac/waku/reconciliation/1.0.0"
WakuTransferCodec* = "/vac/waku/transfer/1.0.0"
WakuMetadataCodec* = "/vac/waku/metadata/1.0.0"
WakuPeerExchangeCodec* = "/vac/waku/peer-exchange/2.0.0-alpha1"
WakuLegacyStoreCodec* = "/vac/waku/store/2.0.0-beta4"
8 changes: 8 additions & 0 deletions waku/waku_core/message/digest.nim
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@ proc computeMessageHash*(pubsubTopic: PubsubTopic, msg: WakuMessage): WakuMessag
ctx.update(toBytesBE(uint64(msg.timestamp)))

return ctx.finish() # Computes the hash

proc cmp*(x, y: WakuMessageHash): int =
if x < y:
return -1
elif x == y:
return 0

return 1
Loading

0 comments on commit 29fda2d

Please sign in to comment.