From 057762a72873d9a26a615a8631607254b891cf94 Mon Sep 17 00:00:00 2001 From: Marcus Pousette Date: Thu, 18 Jan 2024 14:06:43 +0100 Subject: [PATCH 1/5] fix: don't log NotStarted errors when relaying --- packages/transport/pubsub/src/index.ts | 15 ++++++++++----- packages/transport/stream-interface/src/index.ts | 6 +++++- packages/transport/stream/src/index.ts | 4 ++-- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/packages/transport/pubsub/src/index.ts b/packages/transport/pubsub/src/index.ts index fb3cb8152..f72247bd4 100644 --- a/packages/transport/pubsub/src/index.ts +++ b/packages/transport/pubsub/src/index.ts @@ -38,7 +38,12 @@ import { getPublicKeyFromPeerId, PublicSignKey } from "@peerbit/crypto"; import { CustomEvent } from "@libp2p/interface"; export const logger = logFn({ module: "lazysub", level: "warn" }); -const logError = (e?: { message: string }) => logger.error(e?.message); +const logError = (e?: { message: string }) => { + logger.error(e?.message); +}; +const logErrorIfStarted = (e?: { message: string }) => { + e instanceof NotStartedError === false && logError(e); +}; export interface PeerStreamsInit { id: Libp2pPeerId; @@ -553,7 +558,7 @@ export class DirectSub extends DirectStream implements PubSub { message.header.mode instanceof SeekDelivery ) { // DONT await this since it might introduce a dead-lock - this.relayMessage(from, message).catch(logError); + this.relayMessage(from, message).catch(logErrorIfStarted); } } else { if ((await this.verifyAndProcess(message)) === false) { @@ -657,7 +662,7 @@ export class DirectSub extends DirectStream implements PubSub { // Forward // DONT await this since it might introduce a dead-lock - this.relayMessage(from, message).catch(logError); + this.relayMessage(from, message).catch(logErrorIfStarted); } else if (pubsubMessage instanceof Unsubscribe) { if (this.subscriptionMessageIsLatest(message, pubsubMessage)) { const changed: string[] = []; @@ -693,7 +698,7 @@ export class DirectSub extends DirectStream implements PubSub { } // DONT await this since it might introduce a dead-lock - this.relayMessage(from, message).catch(logError); + this.relayMessage(from, message).catch(logErrorIfStarted); } else if (pubsubMessage instanceof GetSubscribers) { const subscriptionsToSend: string[] = this.getSubscriptionOverlap( pubsubMessage.topics @@ -723,7 +728,7 @@ export class DirectSub extends DirectStream implements PubSub { // Forward // DONT await this since it might introduce a dead-lock - this.relayMessage(from, message).catch(logError); + this.relayMessage(from, message).catch(logErrorIfStarted); } } return true; diff --git a/packages/transport/stream-interface/src/index.ts b/packages/transport/stream-interface/src/index.ts index 50d225f2a..af709b133 100644 --- a/packages/transport/stream-interface/src/index.ts +++ b/packages/transport/stream-interface/src/index.ts @@ -24,4 +24,8 @@ export interface WaitForPeer { ): Promise; } -export class NotStartedError extends Error {} +export class NotStartedError extends Error { + constructor() { + super("Not started"); + } +} diff --git a/packages/transport/stream/src/index.ts b/packages/transport/stream/src/index.ts index 43ef9cf00..a880c3497 100644 --- a/packages/transport/stream/src/index.ts +++ b/packages/transport/stream/src/index.ts @@ -1662,8 +1662,8 @@ export abstract class DirectStream< to?: PeerStreams[] | Map, relayed?: boolean ): Promise { - if (this.stopping) { - return; + if (this.stopping || !this.started) { + throw new NotStartedError(); } let delivereyPromise: Promise | undefined = undefined as any; From 7f70351c2506fd827e5e1b910c1664f3c8e358f0 Mon Sep 17 00:00:00 2001 From: Marcus Pousette Date: Thu, 18 Jan 2024 15:18:53 +0100 Subject: [PATCH 2/5] fix: reset size counter correctly when building messages --- packages/programs/data/shared-log/src/exchange-heads.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/programs/data/shared-log/src/exchange-heads.ts b/packages/programs/data/shared-log/src/exchange-heads.ts index 96725fd8c..b0eab949f 100644 --- a/packages/programs/data/shared-log/src/exchange-heads.ts +++ b/packages/programs/data/shared-log/src/exchange-heads.ts @@ -112,6 +112,7 @@ export const createExchangeHeadsMessages = async ( size += fromHead.size; if (size > MAX_EXCHANGE_MESSAGE_SIZE) { + size = 0; messages.push( new ExchangeHeadsMessage({ heads: current From 6abfdbbfd49c8ed2121f2cb3b84faef0fcfeb361 Mon Sep 17 00:00:00 2001 From: Marcus Pousette Date: Thu, 18 Jan 2024 15:19:05 +0100 Subject: [PATCH 3/5] fix: add replication test for large heads --- .../src/__tests__/replicate.test.ts | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/packages/programs/data/shared-log/src/__tests__/replicate.test.ts b/packages/programs/data/shared-log/src/__tests__/replicate.test.ts index 6411bfb8d..fa150c8bd 100644 --- a/packages/programs/data/shared-log/src/__tests__/replicate.test.ts +++ b/packages/programs/data/shared-log/src/__tests__/replicate.test.ts @@ -313,6 +313,27 @@ describe(`exchange`, function () { } } }); + it("replicates database of large entries", async () => { + let count = 10; + for (let i = 0; i < count; i++) { + const value = toBase64(randomBytes(4e6)); + await db1.add(value, { meta: { next: [] } }); // force unique heads + } + db2 = (await EventStore.open>( + db1.address!, + session.peers[1], + { + args: { + role: { + type: "replicator", + factor: 1 + } + } + } + ))!; + + await waitForResolved(() => expect(db2.log.log.length).toEqual(count)); + }); describe("redundancy", () => { it("only sends entries once", async () => { From 7bb41fc8f148d5ff6f32f6e8824556d0e585aea2 Mon Sep 17 00:00:00 2001 From: Marcus Pousette Date: Thu, 18 Jan 2024 15:59:57 +0100 Subject: [PATCH 4/5] fix: cleanly close protocol --- packages/transport/stream/src/index.ts | 27 +++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/packages/transport/stream/src/index.ts b/packages/transport/stream/src/index.ts index a880c3497..f455699af 100644 --- a/packages/transport/stream/src/index.ts +++ b/packages/transport/stream/src/index.ts @@ -62,6 +62,7 @@ import { import { MultiAddrinfo } from "@peerbit/stream-interface"; import { BandwidthTracker } from "./stats.js"; +export { BandwidthTracker }; // might be useful for others const logError = (e?: { message: string }) => { return logger.error(e?.message); @@ -618,35 +619,35 @@ export abstract class DirectStream< return; } - // reset and clear up - - this.started = false; - - this.closeController.abort(); - clearTimeout(this.pruneConnectionsTimeout); await Promise.all( this.multicodecs.map((x) => this.components.registrar.unhandle(x)) ); + // unregister protocol and handlers + if (this._registrarTopologyIds != null) { + this._registrarTopologyIds?.map((id) => + this.components.registrar.unregister(id) + ); + } + + // reset and clear up + this.started = false; + + this.closeController.abort(); + logger.debug("stopping"); for (const peerStreams of this.peers.values()) { await peerStreams.close(); } + for (const [k, v] of this.healthChecks) { clearTimeout(v); } this.healthChecks.clear(); this.prunedConnectionsCache?.clear(); - // unregister protocol and handlers - if (this._registrarTopologyIds != null) { - this._registrarTopologyIds?.map((id) => - this.components.registrar.unregister(id) - ); - } - this.queue.clear(); this.peers.clear(); this.seenCache.clear(); From d1d48c95858428a766315df4241c97f6b458682a Mon Sep 17 00:00:00 2001 From: Marcus Pousette Date: Thu, 18 Jan 2024 16:00:18 +0100 Subject: [PATCH 5/5] fix: allow documents of at most 5mb size --- .../src/__tests__/index.integration.test.ts | 1621 +++++++++-------- .../data/document/src/document-index.ts | 186 +- .../data/document/src/document-store.ts | 16 +- 3 files changed, 996 insertions(+), 827 deletions(-) diff --git a/packages/programs/data/document/src/__tests__/index.integration.test.ts b/packages/programs/data/document/src/__tests__/index.integration.test.ts index 6a34f4340..bf50fc7df 100644 --- a/packages/programs/data/document/src/__tests__/index.integration.test.ts +++ b/packages/programs/data/document/src/__tests__/index.integration.test.ts @@ -266,6 +266,35 @@ describe("index", () => { (await store.docs.log.log.values.toArray()).map((x) => x.hash) ).toEqual([deleteOperation.hash]); // the delete operation }); + + it("rejects on max message size", async () => { + store = new TestStore({ + docs: new Documents({ + immutable: false + }) + }); + await session.peers[0].open(store); + + // not ok + await expect( + store.docs.put( + new Document({ + id: uuid(), + data: randomBytes(5e6) + }) + ) + ).rejects.toThrow( + "Document is too large (5.00005) mb). Needs to be less than 5 mb" + ); + + // ok + await store.docs.put( + new Document({ + id: uuid(), + data: randomBytes(5e6 - 100) + }) + ); + }); }); describe("replication", () => { @@ -864,242 +893,374 @@ describe("index", () => { }); describe("search", () => { - let peersCount = 3, - stores: TestStore[] = [], - writeStore: TestStore, - canRead: ( - | undefined - | ((obj: any, publicKey: PublicSignKey) => Promise) - )[] = [], - canSearch: ( - | undefined - | (( - query: AbstractSearchRequest, - publicKey: PublicSignKey - ) => Promise) - )[] = []; - beforeAll(async () => { - session = await TestSession.connected(peersCount); - }); + describe("fields", () => { + let peersCount = 3, + stores: TestStore[] = [], + writeStore: TestStore, + canRead: ( + | undefined + | ((obj: any, publicKey: PublicSignKey) => Promise) + )[] = [], + canSearch: ( + | undefined + | (( + query: AbstractSearchRequest, + publicKey: PublicSignKey + ) => Promise) + )[] = []; + beforeAll(async () => { + session = await TestSession.connected(peersCount); + }); + + afterAll(async () => { + await session.stop(); + }); + + beforeEach(async () => { + stores = []; + // Create store + for (let i = 0; i < peersCount; i++) { + const store = + i > 0 + ? (await TestStore.load( + stores[0].address!, + session.peers[i].services.blocks + ))! + : new TestStore({ + docs: new Documents() + }); + await session.peers[i].open(store, { + args: { + role: i === 0 ? { type: "replicator", factor: 1 } : "observer", + index: { + canRead: + i === 0 + ? (obj, key) => { + return canRead[i] ? canRead[i]!(obj, key) : true; + } + : undefined, + canSearch: + i === 0 + ? (query, key) => { + return canSearch[i] + ? canSearch[i]!(query, key) + : true; + } + : undefined + } + } + }); + stores.push(store); + } - afterAll(async () => { - await session.stop(); - }); + writeStore = stores[0]; - beforeEach(async () => { - stores = []; - // Create store - for (let i = 0; i < peersCount; i++) { - const store = - i > 0 - ? (await TestStore.load( - stores[0].address!, - session.peers[i].services.blocks - ))! - : new TestStore({ - docs: new Documents() - }); - await session.peers[i].open(store, { - args: { - role: i === 0 ? { type: "replicator", factor: 1 } : "observer", - index: { - canRead: - i === 0 - ? (obj, key) => { - return canRead[i] ? canRead[i]!(obj, key) : true; - } - : undefined, - canSearch: - i === 0 - ? (query, key) => { - return canSearch[i] ? canSearch[i]!(query, key) : true; - } - : undefined - } - } + let doc = new Document({ + id: Buffer.from("1"), + name: "hello", + number: 1n }); - stores.push(store); - } - writeStore = stores[0]; - - let doc = new Document({ - id: Buffer.from("1"), - name: "hello", - number: 1n - }); + let docEdit = new Document({ + id: Buffer.from("1"), + name: "hello world", + number: 1n, + bool: true, + data: new Uint8Array([1]) + }); - let docEdit = new Document({ - id: Buffer.from("1"), - name: "hello world", - number: 1n, - bool: true, - data: new Uint8Array([1]) - }); + let doc2 = new Document({ + id: Buffer.from("2"), + name: "hello world", + number: 4n + }); - let doc2 = new Document({ - id: Buffer.from("2"), - name: "hello world", - number: 4n - }); - - let doc2Edit = new Document({ - id: Buffer.from("2"), - name: "Hello World", - number: 2n, - data: new Uint8Array([2]) - }); - - let doc3 = new Document({ - id: Buffer.from("3"), - name: "foo", - number: 3n, - data: new Uint8Array([3]) - }); - - let doc4 = new Document({ - id: Buffer.from("4"), - name: undefined, - number: undefined - }); - - await writeStore.docs.put(doc); - await waitFor(() => writeStore.docs.index.size === 1); - await writeStore.docs.put(docEdit); - await writeStore.docs.put(doc2); - await waitFor(() => writeStore.docs.index.size === 2); - await writeStore.docs.put(doc2Edit); - await writeStore.docs.put(doc3); - await writeStore.docs.put(doc4); - await waitFor(() => writeStore.docs.index.size === 4); - - expect(stores[0].docs.log.role).toBeInstanceOf(Replicator); - expect(stores[1].docs.log.role).toBeInstanceOf(Observer); - await stores[1].waitFor(session.peers[0].peerId); - await stores[1].docs.log.waitForReplicator( - session.peers[0].identity.publicKey - ); - await stores[0].waitFor(session.peers[1].peerId); - canRead = new Array(stores.length).fill(undefined); - canSearch = new Array(stores.length).fill(undefined); - }); + let doc2Edit = new Document({ + id: Buffer.from("2"), + name: "Hello World", + number: 2n, + data: new Uint8Array([2]) + }); - afterEach(async () => { - await Promise.all(stores.map((x) => x.drop())); - }); + let doc3 = new Document({ + id: Buffer.from("3"), + name: "foo", + number: 3n, + data: new Uint8Array([3]) + }); - it("no-args", async () => { - let results: Document[] = await stores[0].docs.index.search( - new SearchRequest({ query: [] }) - ); - expect(results).toHaveLength(4); - }); + let doc4 = new Document({ + id: Buffer.from("4"), + name: undefined, + number: undefined + }); - it("match locally", async () => { - let results: Document[] = await stores[0].docs.index.search( - new SearchRequest({ - query: [] - }), - { remote: false } - ); - expect(results).toHaveLength(4); - }); + await writeStore.docs.put(doc); + await waitFor(() => writeStore.docs.index.size === 1); + await writeStore.docs.put(docEdit); + await writeStore.docs.put(doc2); + await waitFor(() => writeStore.docs.index.size === 2); + await writeStore.docs.put(doc2Edit); + await writeStore.docs.put(doc3); + await writeStore.docs.put(doc4); + await waitFor(() => writeStore.docs.index.size === 4); + + expect(stores[0].docs.log.role).toBeInstanceOf(Replicator); + expect(stores[1].docs.log.role).toBeInstanceOf(Observer); + await stores[1].waitFor(session.peers[0].peerId); + await stores[1].docs.log.waitForReplicator( + session.peers[0].identity.publicKey + ); + await stores[0].waitFor(session.peers[1].peerId); + canRead = new Array(stores.length).fill(undefined); + canSearch = new Array(stores.length).fill(undefined); + }); - it("match all", async () => { - let results: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [] - }), - { remote: { amount: 1 } } - ); - expect(results).toHaveLength(4); - }); + afterEach(async () => { + await Promise.all(stores.map((x) => x.drop())); + }); - describe("sync", () => { - it("can match sync", async () => { - expect(stores[1].docs.index.size).toEqual(0); - let canPerformEvents = 0; - let canPerform = stores[1].docs["_optionCanPerform"]?.bind( - stores[1].docs + it("no-args", async () => { + let results: Document[] = await stores[0].docs.index.search( + new SearchRequest({ query: [] }) ); - let syncEvents = 0; - let sync = stores[1].docs.index["_sync"].bind(stores[1].docs.index); - stores[1].docs.index["_sync"] = async (r) => { - syncEvents += 1; - return sync(r); - }; - stores[1].docs["_optionCanPerform"] = async (a, b) => { - canPerformEvents += 1; - return !canPerform || canPerform(a, b); - }; + expect(results).toHaveLength(4); + }); - await stores[1].docs.index.search( + it("match locally", async () => { + let results: Document[] = await stores[0].docs.index.search( new SearchRequest({ query: [] }), - { remote: { amount: 1, sync: true } } + { remote: false } ); - await waitFor(() => stores[1].docs.index.size === 4); - expect(stores[1].docs.log.log.length).toEqual(6); // 4 documents where 2 have been edited once (4 + 2) - expect(canPerformEvents).toEqual(6); // 4 documents where 2 have been edited once (4 + 2) - expect(syncEvents).toEqual(1); + expect(results).toHaveLength(4); + }); - await stores[1].docs.index.search( + it("match all", async () => { + let results: Document[] = await stores[1].docs.index.search( new SearchRequest({ query: [] }), - { remote: { amount: 1, sync: true } } + { remote: { amount: 1 } } ); - await waitFor(() => syncEvents == 2); - expect(canPerformEvents).toEqual(6); // no new checks, since all docs already added + expect(results).toHaveLength(4); }); - it("will not sync already existing", async () => {}); - }); - describe("string", () => { - it("exact", async () => { - let responses: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new StringMatch({ - key: "name", - value: "hello world", - caseInsensitive: true + describe("sync", () => { + it("can match sync", async () => { + expect(stores[1].docs.index.size).toEqual(0); + let canPerformEvents = 0; + let canPerform = stores[1].docs["_optionCanPerform"]?.bind( + stores[1].docs + ); + let syncEvents = 0; + let sync = stores[1].docs.index["_sync"].bind(stores[1].docs.index); + stores[1].docs.index["_sync"] = async (r) => { + syncEvents += 1; + return sync(r); + }; + stores[1].docs["_optionCanPerform"] = async (a, b) => { + canPerformEvents += 1; + return !canPerform || canPerform(a, b); + }; + + await stores[1].docs.index.search( + new SearchRequest({ + query: [] + }), + { remote: { amount: 1, sync: true } } + ); + await waitFor(() => stores[1].docs.index.size === 4); + expect(stores[1].docs.log.log.length).toEqual(6); // 4 documents where 2 have been edited once (4 + 2) + expect(canPerformEvents).toEqual(6); // 4 documents where 2 have been edited once (4 + 2) + expect(syncEvents).toEqual(1); + + await stores[1].docs.index.search( + new SearchRequest({ + query: [] + }), + { remote: { amount: 1, sync: true } } + ); + await waitFor(() => syncEvents == 2); + expect(canPerformEvents).toEqual(6); // no new checks, since all docs already added + }); + it("will not sync already existing", async () => {}); + }); + + describe("string", () => { + it("exact", async () => { + let responses: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new StringMatch({ + key: "name", + value: "hello world", + caseInsensitive: true + }) + ] + }) + ); + expect( + responses.map((x) => Buffer.from(x.id).toString()) + ).toContainAllValues(["1", "2"]); + }); + + it("exact-case-insensitive", async () => { + let responses: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new StringMatch({ + key: "name", + value: "Hello World", + caseInsensitive: true + }) + ] + }) + ); + expect(responses).toHaveLength(2); + expect( + responses.map((x) => Buffer.from(x.id).toString("utf8")) + ).toContainAllValues(["1", "2"]); + }); + + it("exact case sensitive", async () => { + let responses: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new StringMatch({ + key: "name", + value: "Hello World", + caseInsensitive: false + }) + ] + }) + ); + expect(responses).toHaveLength(1); + expect( + responses.map((x) => Buffer.from(x.id).toString("utf8")) + ).toContainAllValues(["2"]); + responses = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new StringMatch({ + key: "name", + value: "hello world", + caseInsensitive: false + }) + ] + }) + ); + expect( + responses.map((x) => Buffer.from(x.id).toString("utf8")) + ).toContainAllValues(["1"]); + }); + it("prefix", async () => { + let responses: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new StringMatch({ + key: "name", + value: "hel", + method: StringMatchMethod.prefix, + caseInsensitive: true + }) + ] + }), + { remote: { amount: 1 } } + ); + expect(responses).toHaveLength(2); + expect( + responses.map((x) => Buffer.from(x.id).toString("utf8")) + ).toContainAllValues(["1", "2"]); + }); + + it("contains", async () => { + let responses: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new StringMatch({ + key: "name", + value: "ello", + method: StringMatchMethod.contains, + caseInsensitive: true + }) + ] + }) + ); + expect(responses).toHaveLength(2); + expect( + responses.map((x) => Buffer.from(x.id).toString("utf8")) + ).toContainAllValues(["1", "2"]); + }); + + describe("arr", () => { + let docArray1 = new Document({ + id: Buffer.from("a"), + name: "_", + number: undefined, + tags: ["Hello", "World"] + }); + + let docArray2 = new Document({ + id: Buffer.from("b"), + name: "__", + number: undefined, + tags: ["Hello"] + }); + beforeEach(async () => { + await writeStore.docs.put(docArray1); + await writeStore.docs.put(docArray2); + }); + afterEach(async () => { + await writeStore.docs.del(docArray1.id); + await writeStore.docs.del(docArray2.id); + }); + it("arr", async () => { + let responses: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new StringMatch({ + key: "tags", + value: "world", + method: StringMatchMethod.contains, + caseInsensitive: true + }) + ] }) - ] - }) - ); - expect( - responses.map((x) => Buffer.from(x.id).toString()) - ).toContainAllValues(["1", "2"]); + ); + expect(responses).toHaveLength(1); + expect( + responses.map((x) => Buffer.from(x.id).toString("utf8")) + ).toContainAllValues(["a"]); + }); + }); }); - it("exact-case-insensitive", async () => { + it("missing", async () => { let responses: Document[] = await stores[1].docs.index.search( new SearchRequest({ query: [ - new StringMatch({ - key: "name", - value: "Hello World", - caseInsensitive: true + new MissingField({ + key: "name" }) ] - }) + }), + { remote: { amount: 1 } } ); - expect(responses).toHaveLength(2); + expect(responses).toHaveLength(1); expect( responses.map((x) => Buffer.from(x.id).toString("utf8")) - ).toContainAllValues(["1", "2"]); + ).toEqual(["4"]); }); - it("exact case sensitive", async () => { + it("bytes", async () => { let responses: Document[] = await stores[1].docs.index.search( new SearchRequest({ query: [ - new StringMatch({ - key: "name", - value: "Hello World", - caseInsensitive: false + new ByteMatchQuery({ + key: "data", + value: Buffer.from([1]) }) ] }) @@ -1107,651 +1268,583 @@ describe("index", () => { expect(responses).toHaveLength(1); expect( responses.map((x) => Buffer.from(x.id).toString("utf8")) - ).toContainAllValues(["2"]); - responses = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new StringMatch({ - key: "name", - value: "hello world", - caseInsensitive: false - }) - ] - }) - ); - expect( - responses.map((x) => Buffer.from(x.id).toString("utf8")) - ).toContainAllValues(["1"]); + ).toEqual(["1"]); }); - it("prefix", async () => { + + it("bool", async () => { let responses: Document[] = await stores[1].docs.index.search( new SearchRequest({ query: [ - new StringMatch({ - key: "name", - value: "hel", - method: StringMatchMethod.prefix, - caseInsensitive: true + new BoolQuery({ + key: "bool", + value: true }) ] }), { remote: { amount: 1 } } ); - expect(responses).toHaveLength(2); - expect( - responses.map((x) => Buffer.from(x.id).toString("utf8")) - ).toContainAllValues(["1", "2"]); - }); - - it("contains", async () => { - let responses: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new StringMatch({ - key: "name", - value: "ello", - method: StringMatchMethod.contains, - caseInsensitive: true - }) - ] - }) - ); - expect(responses).toHaveLength(2); - expect( - responses.map((x) => Buffer.from(x.id).toString("utf8")) - ).toContainAllValues(["1", "2"]); - }); - - describe("arr", () => { - let docArray1 = new Document({ - id: Buffer.from("a"), - name: "_", - number: undefined, - tags: ["Hello", "World"] - }); - - let docArray2 = new Document({ - id: Buffer.from("b"), - name: "__", - number: undefined, - tags: ["Hello"] - }); - beforeEach(async () => { - await writeStore.docs.put(docArray1); - await writeStore.docs.put(docArray2); - }); - afterEach(async () => { - await writeStore.docs.del(docArray1.id); - await writeStore.docs.del(docArray2.id); - }); - it("arr", async () => { - let responses: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new StringMatch({ - key: "tags", - value: "world", - method: StringMatchMethod.contains, - caseInsensitive: true - }) - ] - }) - ); - expect(responses).toHaveLength(1); - expect( - responses.map((x) => Buffer.from(x.id).toString("utf8")) - ).toContainAllValues(["a"]); - }); - }); - }); - - it("missing", async () => { - let responses: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new MissingField({ - key: "name" - }) - ] - }), - { remote: { amount: 1 } } - ); - expect(responses).toHaveLength(1); - expect( - responses.map((x) => Buffer.from(x.id).toString("utf8")) - ).toEqual(["4"]); - }); - - it("bytes", async () => { - let responses: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new ByteMatchQuery({ - key: "data", - value: Buffer.from([1]) - }) - ] - }) - ); - expect(responses).toHaveLength(1); - expect( - responses.map((x) => Buffer.from(x.id).toString("utf8")) - ).toEqual(["1"]); - }); - - it("bool", async () => { - let responses: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new BoolQuery({ - key: "bool", - value: true - }) - ] - }), - { remote: { amount: 1 } } - ); - expect(responses).toHaveLength(1); - expect( - responses.map((x) => Buffer.from(x.id).toString("utf8")) - ).toEqual(["1"]); - }); + expect(responses).toHaveLength(1); + expect( + responses.map((x) => Buffer.from(x.id).toString("utf8")) + ).toEqual(["1"]); + }); - describe("array", () => { - describe("nested store", () => { - @variant("test-nested-document-store") - class NestedDocument extends Program { - @field({ type: Uint8Array }) - id: Uint8Array; + describe("array", () => { + describe("nested store", () => { + @variant("test-nested-document-store") + class NestedDocument extends Program { + @field({ type: Uint8Array }) + id: Uint8Array; - @field({ type: Documents }) - documents: Documents; + @field({ type: Documents }) + documents: Documents; - constructor(document: Documents) { - super(); - this.id = randomBytes(32); - this.documents = document; - } - open(args?: any): Promise { - return this.documents.open({ type: Document }); + constructor(document: Documents) { + super(); + this.id = randomBytes(32); + this.documents = document; + } + open(args?: any): Promise { + return this.documents.open({ type: Document }); + } } - } - @variant("test-nested-nested-document-store") - class NestedDocumentStore extends Program< - Partial> - > { - @field({ type: Uint8Array }) - id: Uint8Array; + @variant("test-nested-nested-document-store") + class NestedDocumentStore extends Program< + Partial> + > { + @field({ type: Uint8Array }) + id: Uint8Array; - @field({ type: Documents }) - documents: Documents; + @field({ type: Documents }) + documents: Documents; - constructor(properties: { docs: Documents }) { - super(); - this.id = randomBytes(32); - this.documents = properties.docs; - } + constructor(properties: { docs: Documents }) { + super(); + this.id = randomBytes(32); + this.documents = properties.docs; + } - async open( - options?: Partial> - ): Promise { - await this.documents.open({ - ...options, - type: NestedDocument, - index: { ...options?.index, key: "id" }, - canOpen: () => true - }); + async open( + options?: Partial> + ): Promise { + await this.documents.open({ + ...options, + type: NestedDocument, + index: { ...options?.index, key: "id" }, + canOpen: () => true + }); + } } - } - - it("nested document store", async () => { - const nestedStore = await session.peers[0].open( - new NestedDocumentStore({ docs: new Documents() }) - ); - const nestedDoc = new NestedDocument(new Documents()); - await session.peers[0].open(nestedDoc); - const document = new Document({ - id: randomBytes(32), - name: "hello" - }); - await nestedDoc.documents.put(document); - await nestedStore.documents.put(nestedDoc); - const nestedStore2 = - await session.peers[1].open( - nestedStore.address, - { args: { role: "observer" } } + it("nested document store", async () => { + const nestedStore = await session.peers[0].open( + new NestedDocumentStore({ docs: new Documents() }) ); - await nestedStore2.documents.log.waitForReplicator( - session.peers[0].identity.publicKey - ); - const results = await nestedStore2.documents.index.search( - new SearchRequest({ - query: [ - new StringMatch({ - key: ["documents", "name"], - value: "hello" - }) - ] - }) - ); - expect(results.length).toEqual(1); + const nestedDoc = new NestedDocument(new Documents()); + await session.peers[0].open(nestedDoc); + const document = new Document({ + id: randomBytes(32), + name: "hello" + }); + await nestedDoc.documents.put(document); + await nestedStore.documents.put(nestedDoc); + + const nestedStore2 = + await session.peers[1].open( + nestedStore.address, + { args: { role: "observer" } } + ); + await nestedStore2.documents.log.waitForReplicator( + session.peers[0].identity.publicKey + ); + const results = await nestedStore2.documents.index.search( + new SearchRequest({ + query: [ + new StringMatch({ + key: ["documents", "name"], + value: "hello" + }) + ] + }) + ); + expect(results.length).toEqual(1); + }); }); - }); - - describe("multi-dimensional", () => { - class MultiDimensionalDoc { - @field({ type: Uint8Array }) - id: Uint8Array; - @field({ type: option(vec(Uint8Array)) }) - bytesArrays?: Uint8Array[]; + describe("multi-dimensional", () => { + class MultiDimensionalDoc { + @field({ type: Uint8Array }) + id: Uint8Array; - @field({ type: option(vec(vec("u32"))) }) - matrix?: number[][]; - - @field({ type: option(vec(Document)) }) - documents?: Document[]; - - constructor(properties?: { + @field({ type: option(vec(Uint8Array)) }) bytesArrays?: Uint8Array[]; + + @field({ type: option(vec(vec("u32"))) }) matrix?: number[][]; + + @field({ type: option(vec(Document)) }) documents?: Document[]; - }) { - this.id = randomBytes(32); - this.matrix = properties?.matrix; - this.bytesArrays = properties?.bytesArrays; - this.documents = properties?.documents; + + constructor(properties?: { + bytesArrays?: Uint8Array[]; + matrix?: number[][]; + documents?: Document[]; + }) { + this.id = randomBytes(32); + this.matrix = properties?.matrix; + this.bytesArrays = properties?.bytesArrays; + this.documents = properties?.documents; + } } - } - @variant("test-multidim-doc-store") - class MultiDimensionalDocStore extends Program { - @field({ type: Documents }) - documents: Documents; + @variant("test-multidim-doc-store") + class MultiDimensionalDocStore extends Program { + @field({ type: Documents }) + documents: Documents; - constructor() { - super(); - this.documents = new Documents(); + constructor() { + super(); + this.documents = new Documents(); + } + open(args?: Partial>): Promise { + return this.documents.open({ + ...args, + type: MultiDimensionalDoc + }); + } } - open(args?: Partial>): Promise { - return this.documents.open({ - ...args, - type: MultiDimensionalDoc + + it("uint8array[]", async () => { + const docs = await session.peers[0].open( + new MultiDimensionalDocStore() + ); + + const d1 = new MultiDimensionalDoc({ + bytesArrays: [new Uint8Array([1]), new Uint8Array([2])] }); - } - } + await docs.documents.put(d1); + await docs.documents.put( + new MultiDimensionalDoc({ bytesArrays: [new Uint8Array([3])] }) + ); - it("uint8array[]", async () => { - const docs = await session.peers[0].open( - new MultiDimensionalDocStore() - ); + const docsObserver = + await session.peers[1].open( + docs.address, + { args: { role: "observer" } } + ); + await docsObserver.documents.log.waitForReplicator( + session.peers[0].identity.publicKey + ); - const d1 = new MultiDimensionalDoc({ - bytesArrays: [new Uint8Array([1]), new Uint8Array([2])] + const results = await docsObserver.documents.index.search( + new SearchRequest({ + query: [ + new ByteMatchQuery({ + key: "bytesArrays", + value: new Uint8Array([2]) + }) + ] + }) + ); + expect(results.map((x) => x.id)).toEqual([new Uint8Array(d1.id)]); }); - await docs.documents.put(d1); - await docs.documents.put( - new MultiDimensionalDoc({ bytesArrays: [new Uint8Array([3])] }) - ); - const docsObserver = - await session.peers[1].open( - docs.address, - { args: { role: "observer" } } + it("number[][]", async () => { + const docs = await session.peers[0].open( + new MultiDimensionalDocStore() ); - await docsObserver.documents.log.waitForReplicator( - session.peers[0].identity.publicKey - ); - const results = await docsObserver.documents.index.search( - new SearchRequest({ - query: [ - new ByteMatchQuery({ - key: "bytesArrays", - value: new Uint8Array([2]) + const d1 = new MultiDimensionalDoc({ matrix: [[1, 2], [3]] }); + await docs.documents.put(d1); + await docs.documents.put( + new MultiDimensionalDoc({ matrix: [[4, 5]] }) + ); + + const docsObserver = + await session.peers[1].open( + docs.address, + { args: { role: "observer" } } + ); + await docsObserver.documents.log.waitForReplicator( + session.peers[0].identity.publicKey + ); + + const results = await docsObserver.documents.index.search( + new SearchRequest({ + query: new IntegerCompare({ + key: "matrix", + compare: Compare.Equal, + value: 2 }) - ] - }) + }) + ); + expect(results.map((x) => x.id)).toEqual([new Uint8Array(d1.id)]); + }); + + it("Document[]", async () => { + const docs = await session.peers[0].open( + new MultiDimensionalDocStore() + ); + + const d1 = new MultiDimensionalDoc({ + documents: [new Document({ id: randomBytes(32), number: 123n })] + }); + await docs.documents.put(d1); + await docs.documents.put( + new MultiDimensionalDoc({ + documents: [ + new Document({ id: randomBytes(32), number: 124n }) + ] + }) + ); + + const docsObserver = + await session.peers[1].open( + docs.address, + { args: { role: "observer" } } + ); + await docsObserver.documents.log.waitForReplicator( + session.peers[0].identity.publicKey + ); + + const results = await docsObserver.documents.index.search( + new SearchRequest({ + query: new IntegerCompare({ + key: ["documents", "number"], + compare: Compare.Equal, + value: 123n + }) + }) + ); + expect(results.map((x) => x.id)).toEqual([new Uint8Array(d1.id)]); + }); + }); + }); + + describe("canRead", () => { + it("no read access will return a response with 0 results", async () => { + const canReadInvocation: [Document, PublicSignKey][] = []; + canRead[0] = (a, b) => { + canReadInvocation.push([a, b]); + return Promise.resolve(false); + }; + let allResponses: AbstractSearchResult[] = []; + let responses: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [] + }), + { + local: false, + remote: { + onResponse: (r) => { + allResponses.push(r); + } + } + } ); - expect(results.map((x) => x.id)).toEqual([new Uint8Array(d1.id)]); + expect(responses).toHaveLength(0); + expect(allResponses).toHaveLength(1); + expect(allResponses[0]).toBeInstanceOf(Results); + expect(canReadInvocation).toHaveLength(4); // 4 documents in store + expect(canReadInvocation[0][0]).toBeInstanceOf(Document); + expect(canReadInvocation[0][1]).toBeInstanceOf(Ed25519PublicKey); }); + }); - it("number[][]", async () => { - const docs = await session.peers[0].open( - new MultiDimensionalDocStore() + describe("canSearch", () => { + it("no search access will return an error response", async () => { + const canSearchInvocations: [ + AbstractSearchRequest, + PublicSignKey + ][] = []; + canSearch[0] = (a, b) => { + canSearchInvocations.push([a, b]); + return Promise.resolve(false); + }; + let allResponses: AbstractSearchResult[] = []; + let responses: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [] + }), + { + local: false, + remote: { + amount: 1, + onResponse: (r) => { + allResponses.push(r); + } + } + } ); + expect(responses).toHaveLength(0); + expect(allResponses).toHaveLength(1); + expect(allResponses[0]).toBeInstanceOf(NoAccess); + expect(canSearchInvocations).toHaveLength(1); + expect(canSearchInvocations[0][0]).toBeInstanceOf(SearchRequest); + expect(canSearchInvocations[0][1]).toBeInstanceOf(Ed25519PublicKey); + }); + }); - const d1 = new MultiDimensionalDoc({ matrix: [[1, 2], [3]] }); - await docs.documents.put(d1); - await docs.documents.put( - new MultiDimensionalDoc({ matrix: [[4, 5]] }) + describe("logical", () => { + it("and", async () => { + let responses: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new And([ + new StringMatch({ + key: "name", + value: "hello", + caseInsensitive: true, + method: StringMatchMethod.contains + }), + new StringMatch({ + key: "name", + value: "world", + caseInsensitive: true, + method: StringMatchMethod.contains + }) + ]) + ] + }), + { remote: { amount: 1 } } ); + expect(responses).toHaveLength(2); + expect( + responses.map((x) => Buffer.from(x.id).toString("utf8")) + ).toContainAllValues(["1", "2"]); + }); - const docsObserver = - await session.peers[1].open( - docs.address, - { args: { role: "observer" } } - ); - await docsObserver.documents.log.waitForReplicator( - session.peers[0].identity.publicKey + it("or", async () => { + let responses: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new Or([ + new ByteMatchQuery({ + key: "id", + value: Buffer.from("1") + }), + new ByteMatchQuery({ + key: "id", + value: Buffer.from("2") + }) + ]) + ] + }), + { remote: { amount: 1 } } ); + expect(responses).toHaveLength(2); + expect( + responses.map((x) => Buffer.from(x.id).toString("utf8")) + ).toContainAllValues(["1", "2"]); + }); + }); - const results = await docsObserver.documents.index.search( + describe("number", () => { + it("equal", async () => { + let response: Document[] = await stores[1].docs.index.search( new SearchRequest({ - query: new IntegerCompare({ - key: "matrix", - compare: Compare.Equal, - value: 2 - }) - }) + query: [ + new IntegerCompare({ + key: "number", + compare: Compare.Equal, + value: 2n + }) + ] + }), + { remote: { amount: 1 } } ); - expect(results.map((x) => x.id)).toEqual([new Uint8Array(d1.id)]); + expect(response).toHaveLength(1); + expect(response[0].number).toEqual(2n); }); - it("Document[]", async () => { - const docs = await session.peers[0].open( - new MultiDimensionalDocStore() + it("gt", async () => { + let response: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new IntegerCompare({ + key: "number", + compare: Compare.Greater, + value: 2n + }) + ] + }), + { remote: { amount: 1 } } ); + expect(response).toHaveLength(1); + expect(response[0].number).toEqual(3n); + }); - const d1 = new MultiDimensionalDoc({ - documents: [new Document({ id: randomBytes(32), number: 123n })] - }); - await docs.documents.put(d1); - await docs.documents.put( - new MultiDimensionalDoc({ - documents: [new Document({ id: randomBytes(32), number: 124n })] - }) + it("gte", async () => { + let response: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new IntegerCompare({ + key: "number", + compare: Compare.GreaterOrEqual, + value: 2n + }) + ] + }), + { remote: { amount: 1 } } ); + response.sort((a, b) => + bigIntSort(a.number as bigint, b.number as bigint) + ); + expect(response).toHaveLength(2); + expect(response[0].number).toEqual(2n); + expect(response[1].number).toEqual(3n); + }); - const docsObserver = - await session.peers[1].open( - docs.address, - { args: { role: "observer" } } - ); - await docsObserver.documents.log.waitForReplicator( - session.peers[0].identity.publicKey + it("lt", async () => { + let response: Document[] = await stores[1].docs.index.search( + new SearchRequest({ + query: [ + new IntegerCompare({ + key: "number", + compare: Compare.Less, + value: 2n + }) + ] + }), + { remote: { amount: 1 } } ); + expect(response).toHaveLength(1); + expect(response[0].number).toEqual(1n); + }); - const results = await docsObserver.documents.index.search( + it("lte", async () => { + let response: Document[] = await stores[1].docs.index.search( new SearchRequest({ - query: new IntegerCompare({ - key: ["documents", "number"], - compare: Compare.Equal, - value: 123n - }) - }) + query: [ + new IntegerCompare({ + key: "number", + compare: Compare.LessOrEqual, + value: 2n + }) + ] + }), + { remote: { amount: 1 } } + ); + response.sort((a, b) => + bigIntSort(a.number as bigint, b.number as bigint) ); - expect(results.map((x) => x.id)).toEqual([new Uint8Array(d1.id)]); + expect(response).toHaveLength(2); + expect(response[0].number).toEqual(1n); + expect(response[1].number).toEqual(2n); }); }); - }); - describe("canRead", () => { - it("no read access will return a response with 0 results", async () => { - const canReadInvocation: [Document, PublicSignKey][] = []; - canRead[0] = (a, b) => { - canReadInvocation.push([a, b]); - return Promise.resolve(false); - }; - let allResponses: AbstractSearchResult[] = []; - let responses: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [] - }), - { - local: false, - remote: { - onResponse: (r) => { - allResponses.push(r); - } + describe("concurrently", () => { + it("can query concurrently", async () => { + // TODO add more concurrency + let promises: Promise[] = []; + let concurrency = 100; + for (let i = 0; i < concurrency; i++) { + if (i % 2 === 0) { + promises.push( + stores[1].docs.index.search( + new SearchRequest({ + query: [ + new IntegerCompare({ + key: "number", + compare: Compare.GreaterOrEqual, + value: 2n + }) + ] + }), + { remote: { amount: 1 } } + ) + ); + } else { + promises.push( + stores[1].docs.index.search( + new SearchRequest({ + query: [ + new IntegerCompare({ + key: "number", + compare: Compare.Less, + value: 2n + }) + ] + }), + { remote: { amount: 1 } } + ) + ); } } - ); - expect(responses).toHaveLength(0); - expect(allResponses).toHaveLength(1); - expect(allResponses[0]).toBeInstanceOf(Results); - expect(canReadInvocation).toHaveLength(4); // 4 documents in store - expect(canReadInvocation[0][0]).toBeInstanceOf(Document); - expect(canReadInvocation[0][1]).toBeInstanceOf(Ed25519PublicKey); - }); - }); - describe("canSearch", () => { - it("no search access will return an error response", async () => { - const canSearchInvocations: [AbstractSearchRequest, PublicSignKey][] = - []; - canSearch[0] = (a, b) => { - canSearchInvocations.push([a, b]); - return Promise.resolve(false); - }; - let allResponses: AbstractSearchResult[] = []; - let responses: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [] - }), - { - local: false, - remote: { - amount: 1, - onResponse: (r) => { - allResponses.push(r); - } + let results = await Promise.all(promises); + for (let i = 0; i < concurrency; i++) { + if (i % 2 === 0) { + // query1 + expect(results[i]).toHaveLength(2); + results[i].sort((a, b) => Number(a.number! - b.number!)); + expect(results[i][0].number === 2n).toBeTrue(); // Jest can't seem to output BN if error, so we do equals manually + expect(results[i][1].number === 3n).toBeTrue(); // Jest can't seem to output BN if error, so we do equals manually + } else { + // query2 + expect(results[i]).toHaveLength(1); + expect(results[i][0].number === 1n).toBeTrue(); } } - ); - expect(responses).toHaveLength(0); - expect(allResponses).toHaveLength(1); - expect(allResponses[0]).toBeInstanceOf(NoAccess); - expect(canSearchInvocations).toHaveLength(1); - expect(canSearchInvocations[0][0]).toBeInstanceOf(SearchRequest); - expect(canSearchInvocations[0][1]).toBeInstanceOf(Ed25519PublicKey); - }); - }); - - describe("logical", () => { - it("and", async () => { - let responses: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new And([ - new StringMatch({ - key: "name", - value: "hello", - caseInsensitive: true, - method: StringMatchMethod.contains - }), - new StringMatch({ - key: "name", - value: "world", - caseInsensitive: true, - method: StringMatchMethod.contains - }) - ]) - ] - }), - { remote: { amount: 1 } } - ); - expect(responses).toHaveLength(2); - expect( - responses.map((x) => Buffer.from(x.id).toString("utf8")) - ).toContainAllValues(["1", "2"]); - }); - - it("or", async () => { - let responses: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new Or([ - new ByteMatchQuery({ - key: "id", - value: Buffer.from("1") - }), - new ByteMatchQuery({ - key: "id", - value: Buffer.from("2") - }) - ]) - ] - }), - { remote: { amount: 1 } } - ); - expect(responses).toHaveLength(2); - expect( - responses.map((x) => Buffer.from(x.id).toString("utf8")) - ).toContainAllValues(["1", "2"]); + }); }); }); - describe("number", () => { - it("equal", async () => { - let response: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new IntegerCompare({ - key: "number", - compare: Compare.Equal, - value: 2n - }) - ] - }), - { remote: { amount: 1 } } - ); - expect(response).toHaveLength(1); - expect(response[0].number).toEqual(2n); + describe("limited", () => { + let peersCount = 2; + let writeStore: TestStore; + let readStore: TestStore; + beforeAll(async () => { + session = await TestSession.connected(peersCount); }); - it("gt", async () => { - let response: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new IntegerCompare({ - key: "number", - compare: Compare.Greater, - value: 2n - }) - ] - }), - { remote: { amount: 1 } } - ); - expect(response).toHaveLength(1); - expect(response[0].number).toEqual(3n); + afterAll(async () => { + await session.stop(); }); - it("gte", async () => { - let response: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new IntegerCompare({ - key: "number", - compare: Compare.GreaterOrEqual, - value: 2n - }) - ] - }), - { remote: { amount: 1 } } - ); - response.sort((a, b) => - bigIntSort(a.number as bigint, b.number as bigint) + beforeEach(async () => { + writeStore = new TestStore({ + docs: new Documents() + }); + await session.peers[0].open(writeStore, { + args: { + role: { + type: "replicator", + factor: 1 + } + } + }); + readStore = await session.peers[1].open( + writeStore.address, + { + args: { + role: "observer" + } + } ); - expect(response).toHaveLength(2); - expect(response[0].number).toEqual(2n); - expect(response[1].number).toEqual(3n); }); - - it("lt", async () => { - let response: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new IntegerCompare({ - key: "number", - compare: Compare.Less, - value: 2n - }) - ] - }), - { remote: { amount: 1 } } - ); - expect(response).toHaveLength(1); - expect(response[0].number).toEqual(1n); + afterEach(async () => { + await writeStore.close(); + await readStore.close(); }); - it("lte", async () => { - let response: Document[] = await stores[1].docs.index.search( - new SearchRequest({ - query: [ - new IntegerCompare({ - key: "number", - compare: Compare.LessOrEqual, - value: 2n - }) - ] - }), - { remote: { amount: 1 } } + it("can handle large document limits", async () => { + for (let i = 0; i < 10; i++) { + const doc = new Document({ + id: new Uint8Array([i]), + data: randomBytes(5e6 - 100) + }); + await writeStore.docs.put(doc); + } + await readStore.docs.log.waitForReplicator( + session.peers[0].identity.publicKey ); - response.sort((a, b) => - bigIntSort(a.number as bigint, b.number as bigint) + const collected = await readStore.docs.index.search( + new SearchRequest() ); - expect(response).toHaveLength(2); - expect(response[0].number).toEqual(1n); - expect(response[1].number).toEqual(2n); - }); - }); - - describe("concurrently", () => { - it("can query concurrently", async () => { - // TODO add more concurrency - let promises: Promise[] = []; - let concurrency = 100; - for (let i = 0; i < concurrency; i++) { - if (i % 2 === 0) { - promises.push( - stores[1].docs.index.search( - new SearchRequest({ - query: [ - new IntegerCompare({ - key: "number", - compare: Compare.GreaterOrEqual, - value: 2n - }) - ] - }), - { remote: { amount: 1 } } - ) - ); - } else { - promises.push( - stores[1].docs.index.search( - new SearchRequest({ - query: [ - new IntegerCompare({ - key: "number", - compare: Compare.Less, - value: 2n - }) - ] - }), - { remote: { amount: 1 } } - ) - ); - } - } - - let results = await Promise.all(promises); - for (let i = 0; i < concurrency; i++) { - if (i % 2 === 0) { - // query1 - expect(results[i]).toHaveLength(2); - results[i].sort((a, b) => Number(a.number! - b.number!)); - expect(results[i][0].number === 2n).toBeTrue(); // Jest can't seem to output BN if error, so we do equals manually - expect(results[i][1].number === 3n).toBeTrue(); // Jest can't seem to output BN if error, so we do equals manually - } else { - // query2 - expect(results[i]).toHaveLength(1); - expect(results[i][0].number === 1n).toBeTrue(); - } - } + expect(collected).toHaveLength(10); }); }); }); @@ -2578,10 +2671,10 @@ describe("index", () => { factor: 1 }) await stores[2].docs.updateRole({type: 'replicator', factor: 1}) - + await delay(2000) await waitForResolved(() => expect(stores[0].docs.log.getReplicatorsSorted()?.toArray().map(x => x.publicKey.hashcode())).toContainAllValues([session.peers[1].identity.publicKey.hashcode(), session.peers[2].identity.publicKey.hashcode()])); - + const t1 = +new Date(); const minAge = 1000; await stores[0].docs.index.search(new SearchRequest({ query: [] }), { @@ -2591,7 +2684,7 @@ describe("index", () => { expect(counters[1]).toEqual(1); // but now also remotely since we can not trust local only expect(counters[2]).toEqual(0); await waitFor(() => +new Date() - t1 > minAge + 100); - + await stores[0].docs.index.search(new SearchRequest({ query: [] }), { remote: { minAge } }); diff --git a/packages/programs/data/document/src/document-index.ts b/packages/programs/data/document/src/document-index.ts index 5e0941c66..823f17fc3 100644 --- a/packages/programs/data/document/src/document-index.ts +++ b/packages/programs/data/document/src/document-index.ts @@ -1,6 +1,6 @@ import { AbstractType, field, serialize, variant } from "@dao-xyz/borsh"; import { asString, Keyable } from "./utils.js"; -import { BORSH_ENCODING, Encoding } from "@peerbit/log"; +import { BORSH_ENCODING, Encoding, Entry } from "@peerbit/log"; import { equals } from "@peerbit/uint8arrays"; import { Program } from "@peerbit/program"; import { @@ -127,7 +127,7 @@ export interface IndexedValue { key: string; value: Record | T; // decrypted, decoded context: Context; - reference?: T; + reference?: ValueWithLastOperation; } export type RemoteQueryOptions = RPCOptions & { @@ -149,6 +149,10 @@ export type ResultsIterator = { next: (number: number) => Promise; done: () => boolean; }; +type ValueWithLastOperation = { + value: T; + last: PutOperation>; +}; const sortCompare = (av: any, bv: any) => { if (typeof av === "string" && typeof bv === "string") { @@ -189,7 +193,10 @@ const extractSortCompare = ( return 0; }; -const resolvedSort = async ( +const resolvedSort = async < + T, + Q extends { value: { value: T }; context: Context } +>( arr: Q[], index: IndexableFields, sorts: Sort[] @@ -197,7 +204,7 @@ const resolvedSort = async ( await Promise.all( arr.map( async (result) => - (result[SORT_TMP_KEY] = await index(result.value, result.context)) + (result[SORT_TMP_KEY] = await index(result.value.value, result.context)) ) ); arr.sort((a, b) => @@ -266,6 +273,28 @@ const DEFAULT_REPLICATOR_MIN_AGE = 10 * 1000; // how long, until we consider a p if (!(await this.canRead(message.sender))) { throw new AccessError(); } */ +export const MAX_DOCUMENT_SIZE = 5e6; + +const getBatchFromResults = ( + results: { value: ValueWithLastOperation; context: Context }[], + wantedSize: number, + maxSize: number = MAX_DOCUMENT_SIZE +) => { + const batch: { value: ValueWithLastOperation; context: Context }[] = []; + let size = 0; + for (const result of results) { + batch.push(result); + size += result.value.last.data.length; + if (size > maxSize) { + break; + } + if (wantedSize <= batch.length) { + break; + } + } + results.splice(0, batch.length); + return batch; +}; export type CanSearch = ( request: SearchRequest | CollectNextRequest, @@ -277,11 +306,11 @@ export type CanRead = ( from: PublicSignKey ) => Promise | boolean; -export type IndexedDB = { index: DocumentIndex }; +export type InMemoryIndex = { index: DocumentIndex }; export type OpenOptions = { type: AbstractType; - dbType: AbstractType>; + dbType: AbstractType>; log: SharedLog>; canRead?: CanRead; canSearch?: CanSearch; @@ -296,7 +325,7 @@ export class DocumentIndex extends Program> { _query: RPC>; type: AbstractType; - dbType: AbstractType>; + dbType: AbstractType>; // Index key private _indexBy: string | string[]; @@ -314,7 +343,7 @@ export class DocumentIndex extends Program> { private _index: Map>; private _resultsCollectQueue: Cache<{ from: PublicSignKey; - arr: { value: T; context: Context }[]; + arr: { value: ValueWithLastOperation; context: Context }[]; }>; private _log: SharedLog>; @@ -392,13 +421,19 @@ export class DocumentIndex extends Program> { return new Results({ // Even if results might have length 0, respond, because then we now at least there are no matching results - results: results.results.map( - (r) => - new ResultWithSource({ - source: serialize(r.value), - context: r.context - }) - ), + results: results.results.map((r) => { + if (r.value.last instanceof PutOperation === false) { + throw new Error( + "Unexpected value type on local results: " + + (r.value.last as any)?.constructor.name || + typeof r.value.last + ); + } + return new ResultWithSource({ + source: r.value.last.data, + context: r.context + }); + }), kept: BigInt(results.kept) }); } @@ -449,19 +484,21 @@ export class DocumentIndex extends Program> { return this._index.size; } - async getDocument(value: { - reference?: T; + private async getDocumentWithLastOperation(value: { + reference?: ValueWithLastOperation; context: { head: string }; - }): Promise { + }): Promise> { if (value.reference) { return value.reference; } - const payloadValue = await (await this._log.log.get( - value.context.head - ))!.getPayloadValue(); + const head = await (await this._log.log.get(value.context.head))!; + const payloadValue = await head.getPayloadValue(); if (payloadValue instanceof PutOperation) { - return payloadValue.getValue(this.valueEncoding); + return { + value: payloadValue.getValue(this.valueEncoding), + last: payloadValue + }; } throw new Error( @@ -470,16 +507,24 @@ export class DocumentIndex extends Program> { ); } + getDocument(value: { + reference?: ValueWithLastOperation; + context: { head: string }; + }) { + return this.getDocumentWithLastOperation(value).then((r) => r.value); + } + async _queryDocuments( filter: (doc: IndexedValue) => Promise - ): Promise<{ context: Context; value: T }[]> { + ): Promise<{ context: Context; value: ValueWithLastOperation }[]> { // Whether we return the full operation data or just the db value - const results: { context: Context; value: T }[] = []; + const results: { context: Context; value: ValueWithLastOperation }[] = + []; for (const value of this._index.values()) { if (await filter(value)) { results.push({ context: value.context, - value: await this.getDocument(value) + value: await this.getDocumentWithLastOperation(value) }); } } @@ -492,7 +537,10 @@ export class DocumentIndex extends Program> { options?: { canRead?: CanRead; } - ): Promise<{ results: { context: Context; value: T }[]; kept: number }> { + ): Promise<{ + results: { context: Context; value: ValueWithLastOperation }[]; + kept: number; + }> { // We do special case for querying the id as we can do it faster than iterating if (query instanceof SearchRequest) { @@ -510,7 +558,7 @@ export class DocumentIndex extends Program> { ? { results: [ { - value: await this.getDocument(doc), + value: await this.getDocumentWithLastOperation(doc), context: doc.context } ], @@ -527,7 +575,7 @@ export class DocumentIndex extends Program> { ? { results: [ { - value: await this.getDocument(doc), + value: await this.getDocumentWithLastOperation(doc), context: doc.context } ], @@ -549,7 +597,7 @@ export class DocumentIndex extends Program> { if (options?.canRead) { const keepFilter = await Promise.all( - results.map((x) => options?.canRead!(x.value, from)) + results.map((x) => options?.canRead!(x.value.value, from)) ); results = results.filter((x, i) => keepFilter[i]); } @@ -557,7 +605,8 @@ export class DocumentIndex extends Program> { // Sort await resolvedSort(results, this._toIndex, query.sort); - const batch = results.splice(0, query.fetch); + const batch = getBatchFromResults(results, query.fetch); + if (results.length > 0) { this._resultsCollectQueue.add(query.idString, { arr: results, @@ -575,7 +624,7 @@ export class DocumentIndex extends Program> { }; } - const batch = results.arr.splice(0, query.amount); + const batch = getBatchFromResults(results.arr, query.amount); if (results.arr.length === 0) { this._resultsCollectQueue.del(query.idString); // TODO add tests for proper cleanup/timeouts @@ -630,7 +679,7 @@ export class DocumentIndex extends Program> { if (obj instanceof this.dbType) { const queryCloned = f.clone(); queryCloned.key.splice(0, i + 1); // remove key path until the document store - const results = await (obj as any as IndexedDB).index.search( + const results = await (obj as any as InMemoryIndex).index.search( new SearchRequest({ query: [queryCloned] }) ); return results.length > 0 ? true : false; // TODO return INNER HITS? @@ -757,24 +806,19 @@ export class DocumentIndex extends Program> { ); if (results.results.length > 0) { const resultsObject = new Results({ - results: await Promise.all( - results.results.map(async (r) => { - const payloadValue = await ( - await this._log.log.get(r.context.head) - )?.getPayloadValue(); - if (payloadValue instanceof PutOperation) { - return new ResultWithSource({ - context: r.context, - value: r.value, - source: payloadValue.data - }); - } + results: results.results.map((r) => { + if (r.value.last instanceof PutOperation === false) { throw new Error( "Unexpected value type on local results: " + - payloadValue?.constructor.name || typeof payloadValue + (r.value.last as any)?.constructor.name || typeof r.value.last ); - }) - ), + } + return new ResultWithSource({ + context: r.context, + value: r.value.value, + source: r.value.last.data + }); + }), kept: BigInt(results.kept) }); options?.onResponse && @@ -879,12 +923,23 @@ export class DocumentIndex extends Program> { const iterator = this.iterate(queryRequest, options); // So that this call will not do any remote requests - const allResult = await iterator.next(queryRequest.fetch); + const allResults: T[] = []; + while ( + iterator.done() === false && + queryRequest.fetch > allResults.length + ) { + // We might need to pull .next multiple time due to data message size limitations + for (const result of await iterator.next( + queryRequest.fetch - allResults.length + )) { + allResults.push(result); + } + } await iterator.close(); //s Deduplicate and return values directly - return dedup(allResult, this.indexByResolver); + return dedup(allResults, this.indexByResolver); } /** @@ -902,7 +957,11 @@ export class DocumentIndex extends Program> { string, { kept: number; - buffer: { value: T; context: Context; from: PublicSignKey }[]; + buffer: { + value: { value: T }; + context: Context; + from: PublicSignKey; + }[]; } > = new Map(); const visited = new Set(); @@ -914,7 +973,7 @@ export class DocumentIndex extends Program> { const controller = new AbortController(); const peerBuffers = (): { - value: T; + value: { value: T }; from: PublicSignKey; context: Context; }[] => { @@ -935,22 +994,27 @@ export class DocumentIndex extends Program> { logger.error("Dont have access"); return; } else if (response instanceof Results) { - if (response.kept === 0n && response.results.length === 0) { + const results = response as Results; + if (results.kept === 0n && results.results.length === 0) { return; } - if (response.kept > 0n) { + if (results.kept > 0n) { done = false; // we have more to do later! } peerBufferMap.set(from.hashcode(), { - buffer: response.results + buffer: results.results .filter( (x) => !visited.has(asString(this.indexByResolver(x.value))) ) .map((x) => { visited.add(asString(this.indexByResolver(x.value))); - return { from, value: x.value, context: x.context }; + return { + from, + value: { value: x.value }, + context: x.context + }; }), kept: Number(response.kept) }); @@ -1021,11 +1085,13 @@ export class DocumentIndex extends Program> { .filter( (x) => !visited.has( - asString(this.indexByResolver(x.value)) + asString(this.indexByResolver(x.value.value)) ) ) .map((x) => { - visited.add(asString(this.indexByResolver(x.value))); + visited.add( + asString(this.indexByResolver(x.value.value)) + ); return { value: x.value, context: x.context, @@ -1037,7 +1103,7 @@ export class DocumentIndex extends Program> { }) .catch((e) => { logger.error( - "Failed to collect sorted results self. " + e?.message + "Failed to collect sorted results from self. " + e?.message ); peerBufferMap.delete(peer); }) @@ -1084,7 +1150,7 @@ export class DocumentIndex extends Program> { asString(this.indexByResolver(x.value)) ); return { - value: x.value, + value: { value: x.value }, context: x.context, from: response.from! }; @@ -1151,7 +1217,7 @@ export class DocumentIndex extends Program> { done = fetchedAll && !pendingMoreResults; return dedup( - batch.map((x) => x.value), + batch.map((x) => x.value.value), this.indexByResolver ); }; diff --git a/packages/programs/data/document/src/document-store.ts b/packages/programs/data/document/src/document-store.ts index b909220ba..eba6a5d77 100644 --- a/packages/programs/data/document/src/document-store.ts +++ b/packages/programs/data/document/src/document-store.ts @@ -32,10 +32,12 @@ import { PutOperation, CanSearch, CanRead, - IndexedDB + InMemoryIndex, + MAX_DOCUMENT_SIZE } from "./document-index.js"; import { asString, checkKeyable, Keyable } from "./utils.js"; import { Context, Results } from "./query.js"; +export { MAX_DOCUMENT_SIZE }; const logger = loggerFn({ module: "document" }); @@ -81,7 +83,7 @@ export type SetupOptions = { @variant("documents") export class Documents> extends Program, DocumentEvents & ProgramEvents> - implements IndexedDB + implements InMemoryIndex { @field({ type: SharedLog }) log: SharedLog>; @@ -311,6 +313,14 @@ export class Documents> const key = this._index.indexByResolver(doc as any as Keyable); checkKeyable(key); const ser = serialize(doc); + if (ser.length > MAX_DOCUMENT_SIZE) { + throw new Error( + `Document is too large (${ + ser.length * 1e-6 + }) mb). Needs to be less than ${MAX_DOCUMENT_SIZE * 1e-6} mb` + ); + } + const existingDocument = options?.unique ? undefined : ( @@ -449,7 +459,7 @@ export class Documents> context, reference: valueToIndex === value || value instanceof Program - ? value + ? { value, last: payload } : undefined }); } else if (