Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allowing searching many documents #246

Merged
merged 5 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,621 changes: 857 additions & 764 deletions packages/programs/data/document/src/__tests__/index.integration.test.ts

Large diffs are not rendered by default.

186 changes: 126 additions & 60 deletions packages/programs/data/document/src/document-index.ts

Large diffs are not rendered by default.

16 changes: 13 additions & 3 deletions packages/programs/data/document/src/document-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ import {
PutOperation,
CanSearch,
CanRead,
IndexedDB
InMemoryIndex,
MAX_DOCUMENT_SIZE
} from "./document-index.js";
import { asString, checkKeyable, Keyable } from "./utils.js";
import { Context, Results } from "./query.js";
export { MAX_DOCUMENT_SIZE };

const logger = loggerFn({ module: "document" });

Expand Down Expand Up @@ -81,7 +83,7 @@ export type SetupOptions<T> = {
@variant("documents")
export class Documents<T extends Record<string, any>>
extends Program<SetupOptions<T>, DocumentEvents<T> & ProgramEvents>
implements IndexedDB<T>
implements InMemoryIndex<T>
{
@field({ type: SharedLog })
log: SharedLog<Operation<T>>;
Expand Down Expand Up @@ -311,6 +313,14 @@ export class Documents<T extends Record<string, any>>
const key = this._index.indexByResolver(doc as any as Keyable);
checkKeyable(key);
const ser = serialize(doc);
if (ser.length > MAX_DOCUMENT_SIZE) {
throw new Error(
`Document is too large (${
ser.length * 1e-6
}) mb). Needs to be less than ${MAX_DOCUMENT_SIZE * 1e-6} mb`
);
}

const existingDocument = options?.unique
? undefined
: (
Expand Down Expand Up @@ -449,7 +459,7 @@ export class Documents<T extends Record<string, any>>
context,
reference:
valueToIndex === value || value instanceof Program
? value
? { value, last: payload }
: undefined
});
} else if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,27 @@ describe(`exchange`, function () {
}
}
});
it("replicates database of large entries", async () => {
let count = 10;
for (let i = 0; i < count; i++) {
const value = toBase64(randomBytes(4e6));
await db1.add(value, { meta: { next: [] } }); // force unique heads
}
db2 = (await EventStore.open<EventStore<string>>(
db1.address!,
session.peers[1],
{
args: {
role: {
type: "replicator",
factor: 1
}
}
}
))!;

await waitForResolved(() => expect(db2.log.log.length).toEqual(count));
});

describe("redundancy", () => {
it("only sends entries once", async () => {
Expand Down
1 change: 1 addition & 0 deletions packages/programs/data/shared-log/src/exchange-heads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ export const createExchangeHeadsMessages = async (

size += fromHead.size;
if (size > MAX_EXCHANGE_MESSAGE_SIZE) {
size = 0;
messages.push(
new ExchangeHeadsMessage({
heads: current
Expand Down
15 changes: 10 additions & 5 deletions packages/transport/pubsub/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,12 @@ import { getPublicKeyFromPeerId, PublicSignKey } from "@peerbit/crypto";
import { CustomEvent } from "@libp2p/interface";

export const logger = logFn({ module: "lazysub", level: "warn" });
const logError = (e?: { message: string }) => logger.error(e?.message);
const logError = (e?: { message: string }) => {
logger.error(e?.message);
};
const logErrorIfStarted = (e?: { message: string }) => {
e instanceof NotStartedError === false && logError(e);
};

export interface PeerStreamsInit {
id: Libp2pPeerId;
Expand Down Expand Up @@ -553,7 +558,7 @@ export class DirectSub extends DirectStream<PubSubEvents> implements PubSub {
message.header.mode instanceof SeekDelivery
) {
// DONT await this since it might introduce a dead-lock
this.relayMessage(from, message).catch(logError);
this.relayMessage(from, message).catch(logErrorIfStarted);
}
} else {
if ((await this.verifyAndProcess(message)) === false) {
Expand Down Expand Up @@ -657,7 +662,7 @@ export class DirectSub extends DirectStream<PubSubEvents> implements PubSub {

// Forward
// DONT await this since it might introduce a dead-lock
this.relayMessage(from, message).catch(logError);
this.relayMessage(from, message).catch(logErrorIfStarted);
} else if (pubsubMessage instanceof Unsubscribe) {
if (this.subscriptionMessageIsLatest(message, pubsubMessage)) {
const changed: string[] = [];
Expand Down Expand Up @@ -693,7 +698,7 @@ export class DirectSub extends DirectStream<PubSubEvents> implements PubSub {
}

// DONT await this since it might introduce a dead-lock
this.relayMessage(from, message).catch(logError);
this.relayMessage(from, message).catch(logErrorIfStarted);
} else if (pubsubMessage instanceof GetSubscribers) {
const subscriptionsToSend: string[] = this.getSubscriptionOverlap(
pubsubMessage.topics
Expand Down Expand Up @@ -723,7 +728,7 @@ export class DirectSub extends DirectStream<PubSubEvents> implements PubSub {

// Forward
// DONT await this since it might introduce a dead-lock
this.relayMessage(from, message).catch(logError);
this.relayMessage(from, message).catch(logErrorIfStarted);
}
}
return true;
Expand Down
6 changes: 5 additions & 1 deletion packages/transport/stream-interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ export interface WaitForPeer {
): Promise<void>;
}

export class NotStartedError extends Error {}
export class NotStartedError extends Error {
constructor() {
super("Not started");
}
}
31 changes: 16 additions & 15 deletions packages/transport/stream/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import {

import { MultiAddrinfo } from "@peerbit/stream-interface";
import { BandwidthTracker } from "./stats.js";
export { BandwidthTracker }; // might be useful for others

const logError = (e?: { message: string }) => {
return logger.error(e?.message);
Expand Down Expand Up @@ -618,35 +619,35 @@ export abstract class DirectStream<
return;
}

// reset and clear up

this.started = false;

this.closeController.abort();

clearTimeout(this.pruneConnectionsTimeout);

await Promise.all(
this.multicodecs.map((x) => this.components.registrar.unhandle(x))
);

// unregister protocol and handlers
if (this._registrarTopologyIds != null) {
this._registrarTopologyIds?.map((id) =>
this.components.registrar.unregister(id)
);
}

// reset and clear up
this.started = false;

this.closeController.abort();

logger.debug("stopping");
for (const peerStreams of this.peers.values()) {
await peerStreams.close();
}

for (const [k, v] of this.healthChecks) {
clearTimeout(v);
}
this.healthChecks.clear();
this.prunedConnectionsCache?.clear();

// unregister protocol and handlers
if (this._registrarTopologyIds != null) {
this._registrarTopologyIds?.map((id) =>
this.components.registrar.unregister(id)
);
}

this.queue.clear();
this.peers.clear();
this.seenCache.clear();
Expand Down Expand Up @@ -1662,8 +1663,8 @@ export abstract class DirectStream<
to?: PeerStreams[] | Map<string, PeerStreams>,
relayed?: boolean
): Promise<void> {
if (this.stopping) {
return;
if (this.stopping || !this.started) {
throw new NotStartedError();
}

let delivereyPromise: Promise<void> | undefined = undefined as any;
Expand Down