Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: waku store sync 2.0 storage and tests #3215

Merged
merged 5 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions tests/waku_store_sync/sync_utils.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import std/[options, random], chronos, chronicles

import waku/[node/peer_manager, waku_core, waku_store_sync], ../testlib/wakucore
import waku/[node/peer_manager, waku_core, waku_store_sync/common], ../testlib/wakucore

randomize()

Expand All @@ -12,7 +12,7 @@ proc randomHash*(rng: var Rand): WakuMessageHash =

return hash

proc newTestWakuRecon*(
#[ proc newTestWakuRecon*(
switch: Switch,
idsRx: AsyncQueue[SyncID],
wantsTx: AsyncQueue[(PeerId, Fingerprint)],
Expand All @@ -34,9 +34,9 @@ proc newTestWakuRecon*(
proto.start()
switch.mount(proto)

return proto
return proto ]#

proc newTestWakuTransfer*(
#[ proc newTestWakuTransfer*(
switch: Switch,
idsTx: AsyncQueue[SyncID],
wantsRx: AsyncQueue[(PeerId, Fingerprint)],
Expand All @@ -55,4 +55,4 @@ proc newTestWakuTransfer*(
proto.start()
switch.mount(proto)

return proto
return proto ]#
23 changes: 11 additions & 12 deletions tests/waku_store_sync/test_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import
../../waku/waku_core,
../../waku/waku_core/message/digest,
../../waku/waku_core/time,
../../waku/waku_store_sync,
../../waku/waku_store_sync/common,
../../waku/waku_store_sync/codec,
./sync_utils
Expand All @@ -19,12 +18,12 @@ proc randomItemSet(count: int, startTime: Timestamp, rng: var Rand): ItemSet =
for i in 0 ..< count:
let diff = rng.rand(9.uint8) + 1

let timestamp = lastTime + diff * 1_000_000_000
let timestamp = lastTime + diff * 1_000
lastTime = timestamp

let hash = randomHash(rng)

let id = SyncID(time: Timestamp(timestamp), fingerprint: hash)
let id = SyncID(time: Timestamp(timestamp), hash: hash)

elements.add(id)

Expand All @@ -40,8 +39,8 @@ proc randomSetRange(
ub = itemSet.elements[^1]

#for test check equality
lb.fingerprint = EmptyFingerprint
ub.fingerprint = EmptyFingerprint
lb.hash = EmptyWakuMessageHash
ub.hash = EmptyWakuMessageHash

let bounds = lb .. ub

Expand Down Expand Up @@ -90,9 +89,9 @@ suite "Waku Store Sync Codec":
time = getNowInNanosecondTime()

let (bounds1, itemSet1) = randomSetRange(count, time, rng)
let (bounds2, itemSet2) = randomSetRange(count, time + 10_000_000_000, rng)
let (bounds3, itemSet3) = randomSetRange(count, time + 20_000_000_000, rng)
let (bounds4, itemSet4) = randomSetRange(count, time + 30_000_000_000, rng)
let (bounds2, itemSet2) = randomSetRange(count, time + 11_000_000, rng)
let (bounds3, itemSet3) = randomSetRange(count, time + 21_000_000, rng)
let (bounds4, itemSet4) = randomSetRange(count, time + 31_000_000, rng)

let range1 = (bounds1, RangeType.ItemSet)
let range2 = (bounds2, RangeType.ItemSet)
Expand Down Expand Up @@ -128,12 +127,12 @@ suite "Waku Store Sync Codec":
ranges = newSeqOfCap[(Slice[SyncID], RangeType)](4)

for i in 0 ..< count:
let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint)
let lb = SyncID(time: Timestamp(lastTime), hash: EmptyWakuMessageHash)

let nowTime = lastTime + 10_000_000_000 # 10s

lastTime = nowTime
let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint)
let ub = SyncID(time: Timestamp(nowTime), hash: EmptyWakuMessageHash)
let bounds = lb .. ub
let range = (bounds, RangeType.Fingerprint)

Expand Down Expand Up @@ -171,10 +170,10 @@ suite "Waku Store Sync Codec":
fingerprints = newSeqOfCap[Fingerprint](4)

for i in 1 .. count:
let lb = SyncID(time: Timestamp(lastTime), fingerprint: EmptyFingerprint)
let lb = SyncID(time: Timestamp(lastTime), hash: EmptyWakuMessageHash)
let nowTime = lastTime + 10_000_000_000 # 10s
lastTime = nowTime
let ub = SyncID(time: Timestamp(nowTime), fingerprint: EmptyFingerprint)
let ub = SyncID(time: Timestamp(nowTime), hash: EmptyWakuMessageHash)
let bounds = lb .. ub
let range = (bounds, RangeType.Fingerprint)

Expand Down
204 changes: 204 additions & 0 deletions tests/waku_store_sync/test_storage.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
{.used.}

import std/[options, random], testutils/unittests, chronos

import
../../waku/waku_core,
../../waku/waku_core/message/digest,
../../waku/waku_store_sync/common,
../../waku/waku_store_sync/storage/seq_storage,
./sync_utils

suite "Waku Sync Storage":
test "process hash range":
var rng = initRand()
let count = 10_000
var elements = newSeqOfCap[SyncID](count)

for i in 0 ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))

elements.add(id)

var storage1 = SeqStorage.new(elements)
var storage2 = SeqStorage.new(elements)

let lb = elements[0]
let ub = elements[count - 1]
let bounds = lb .. ub
let fingerprint1 = storage1.computeFingerprint(bounds)

var outputPayload: RangesData

storage2.processFingerprintRange(bounds, fingerprint1, outputPayload)

let expected =
RangesData(ranges: @[(bounds, RangeType.Skip)], fingerprints: @[], itemSets: @[])

check:
outputPayload == expected

test "process item set range":
var rng = initRand()
let count = 1000
var elements1 = newSeqOfCap[SyncID](count)
var elements2 = newSeqOfCap[SyncID](count)
var diffs: seq[Fingerprint]

for i in 0 ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))

elements1.add(id)
if rng.rand(0 .. 9) == 0:
elements2.add(id)
else:
diffs.add(id.hash)

var storage1 = SeqStorage.new(elements1)

let lb = elements1[0]
let ub = elements1[count - 1]
let bounds = lb .. ub

let itemSet2 = ItemSet(elements: elements2, reconciled: true)

var
toSend: seq[Fingerprint]
toRecv: seq[Fingerprint]
outputPayload: RangesData

storage1.processItemSetRange(bounds, itemSet2, toSend, toRecv, outputPayload)

check:
toSend == diffs

test "insert new element":
var rng = initRand()

let storage = SeqStorage.new(10)

let element1 = SyncID(time: Timestamp(1000), hash: randomHash(rng))
let element2 = SyncID(time: Timestamp(2000), hash: randomHash(rng))

let res1 = storage.insert(element1)
assert res1.isOk(), $res1.error
let count1 = storage.length()

let res2 = storage.insert(element2)
assert res2.isOk(), $res2.error
let count2 = storage.length()

check:
count1 == 1
count2 == 2

test "insert duplicate":
var rng = initRand()

let element = SyncID(time: Timestamp(1000), hash: randomHash(rng))

let storage = SeqStorage.new(@[element])

let res = storage.insert(element)

check:
res.isErr() == true

test "prune elements":
var rng = initRand()
let count = 1000
var elements = newSeqOfCap[SyncID](count)

for i in 0 ..< count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))

elements.add(id)

let storage = SeqStorage.new(elements)

let beforeCount = storage.length()

let pruned = storage.prune(Timestamp(500))

let afterCount = storage.length()

check:
beforeCount == 1000
pruned == 500
afterCount == 500

## disabled tests are rough benchmark
#[ test "10M fingerprint":
var rng = initRand()

let count = 10_000_000

var elements = newSeqOfCap[SyncID](count)

for i in 0 .. count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))

elements.add(id)

let storage = SeqStorage.new(elements)

let before = getMonoTime()

discard storage.fingerprinting(some(0 .. count))

let after = getMonoTime()

echo "Fingerprint Time: " & $(after - before) ]#

#[ test "random inserts":
var rng = initRand()

let count = 10_000_000

var elements = newSeqOfCap[SyncID](count)

for i in 0 .. count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))

elements.add(id)

var storage = SeqStorage.new(elements)

var avg: times.Duration
for i in 0 ..< 1000:
let newId =
SyncID(time: Timestamp(rng.rand(0 .. count)), hash: randomHash(rng))

let before = getMonoTime()

discard storage.insert(newId)

let after = getMonoTime()

avg += after - before

avg = avg div 1000

echo "Avg Time 1K Inserts: " & $avg ]#

#[ test "trim":
var rng = initRand()

let count = 10_000_000

var elements = newSeqOfCap[SyncID](count)

for i in 0 .. count:
let id = SyncID(time: Timestamp(i), hash: randomHash(rng))

elements.add(id)

var storage = SeqStorage.new(elements)

let before = getMonoTime()

discard storage.trim(Timestamp(count div 4))

let after = getMonoTime()

echo "Trim Time: " & $(after - before) ]#
Loading
Loading