From a3c6008128a74d0a21362eb61deebd9770ceda5c Mon Sep 17 00:00:00 2001 From: Adil Ansari Date: Mon, 3 Oct 2022 16:40:21 -0700 Subject: [PATCH] refactor: Updates for collections API (#96) --- src/__tests__/cursor/cursor.spec.ts | 5 +- src/__tests__/tigris.rpc.spec.ts | 16 +- src/__tests__/tigris.utility.spec.ts | 1 - src/collection.ts | 233 ++++++++++++++++++--------- src/cursor/abstract-cursor.ts | 22 ++- src/search/types.ts | 4 +- src/types.ts | 42 +++-- src/utility.ts | 22 +++ 8 files changed, 239 insertions(+), 106 deletions(-) diff --git a/src/__tests__/cursor/cursor.spec.ts b/src/__tests__/cursor/cursor.spec.ts index 755d45c..1e0c288 100644 --- a/src/__tests__/cursor/cursor.spec.ts +++ b/src/__tests__/cursor/cursor.spec.ts @@ -6,7 +6,6 @@ import {Tigris} from "../../tigris"; import {TigrisCursorInUseError} from "../../error"; import {ObservabilityService} from "../../proto/server/v1/observability_grpc_pb"; import TestObservabilityService from "../test-observability-service"; -import {ReadResponse} from "../../proto/server/v1/api_pb"; import {DB} from "../../db"; describe("class FindCursor", () => { @@ -75,8 +74,8 @@ describe("class FindCursor", () => { it("does not allow cursor to be re-used", () => { const cursor = db.getCollection("books").findMany(); - // cursor.stream() is a generator fn, calling next() would retrieve item from stream - cursor.stream().next(); + // cursor is backed by is a generator fn, calling next() would retrieve item from stream + cursor[Symbol.asyncIterator]().next(); expect(() => cursor.toArray()).toThrow(TigrisCursorInUseError); }) diff --git a/src/__tests__/tigris.rpc.spec.ts b/src/__tests__/tigris.rpc.spec.ts index d8b42b1..15fcce3 100644 --- a/src/__tests__/tigris.rpc.spec.ts +++ b/src/__tests__/tigris.rpc.spec.ts @@ -167,7 +167,7 @@ describe("rpc tests", () => { it("insert", () => { const tigris = new Tigris({serverUrl: "0.0.0.0:" + SERVER_PORT, insecureChannel: true}); const db1 = tigris.getDatabase("db3"); - const insertionPromise = db1.getCollection("books").insert({ + const insertionPromise = db1.getCollection("books").insertOne({ author: "author name", id: 0, tags: ["science"], @@ -182,7 +182,7 @@ describe("rpc tests", () => { it("insert2", () => { const tigris = new Tigris({serverUrl: "0.0.0.0:" + SERVER_PORT, insecureChannel: true}); const db1 = tigris.getDatabase("db3"); - const insertionPromise = db1.getCollection("books").insert({ + const insertionPromise = db1.getCollection("books").insertOne({ id: 0, title: "science book", metadata: { @@ -202,7 +202,7 @@ describe("rpc tests", () => { const randomNumber: number = Math.floor(Math.random() * 100); // pass the random number in author field. mock server reads author and sets as the // primaryKey field. - const insertionPromise = db1.getCollection("books-with-optional-field").insert({ + const insertionPromise = db1.getCollection("books-with-optional-field").insertOne({ author: "" + randomNumber, tags: ["science"], title: "science book" @@ -216,7 +216,7 @@ describe("rpc tests", () => { it("insertOrReplace", () => { const tigris = new Tigris({serverUrl: "0.0.0.0:" + SERVER_PORT, insecureChannel: true}); const db1 = tigris.getDatabase("db3"); - const insertOrReplacePromise = db1.getCollection("books").insertOrReplace({ + const insertOrReplacePromise = db1.getCollection("books").insertOrReplaceOne({ author: "author name", id: 0, tags: ["science"], @@ -234,7 +234,7 @@ describe("rpc tests", () => { const randomNumber: number = Math.floor(Math.random() * 100); // pass the random number in author field. mock server reads author and sets as the // primaryKey field. - const insertOrReplacePromise = db1.getCollection("books-with-optional-field").insertOrReplace({ + const insertOrReplacePromise = db1.getCollection("books-with-optional-field").insertOrReplaceOne({ author: "" + randomNumber, tags: ["science"], title: "science book" @@ -285,7 +285,7 @@ describe("rpc tests", () => { it("readOne", () => { const tigris = new Tigris({serverUrl: "0.0.0.0:" + SERVER_PORT, insecureChannel: true}); const db1 = tigris.getDatabase("db3"); - const readOnePromise = db1.getCollection("books").findOne({ + const readOnePromise = db1.getCollection("books").findOne( { op: SelectorFilterOperator.EQ, fields: { id: 1 @@ -499,7 +499,7 @@ describe("rpc tests", () => { const txDB = tigris.getDatabase("test-tx"); const books = txDB.getCollection("books"); txDB.transact(tx => { - books.insert( + books.insertOne( { id: 1, author: "Alice", @@ -513,7 +513,7 @@ describe("rpc tests", () => { fields: { id: 1 } - }, tx).then(() => { + }, undefined, tx).then(() => { books.update({ op: SelectorFilterOperator.EQ, fields: { diff --git a/src/__tests__/tigris.utility.spec.ts b/src/__tests__/tigris.utility.spec.ts index 7003916..c92355d 100644 --- a/src/__tests__/tigris.utility.spec.ts +++ b/src/__tests__/tigris.utility.spec.ts @@ -10,7 +10,6 @@ import { SearchRequestOptions, SortOrder } from "../search/types"; -import {Collation} from "../proto/server/v1/api_pb"; describe("utility tests", () => { it("base64encode", () => { diff --git a/src/collection.ts b/src/collection.ts index 802f518..bbd78fa 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -7,8 +7,6 @@ import { EventsResponse as ProtoEventsResponse, InsertRequest as ProtoInsertRequest, ReadRequest as ProtoReadRequest, - ReadRequestOptions as ProtoReadRequestOptions, - ReadResponse as ProtoReadResponse, ReplaceRequest as ProtoReplaceRequest, SearchResponse as ProtoSearchResponse, UpdateRequest as ProtoUpdateRequest, @@ -18,14 +16,9 @@ import { DeleteRequestOptions, DeleteResponse, DMLMetadata, - EventsRequestOptions, - InsertOptions, - InsertOrReplaceOptions, - LogicalFilter, + Filter, ReadFields, ReadRequestOptions, - Selector, - SelectorFilter, SelectorFilterOperator, SimpleUpdateField, StreamEvent, @@ -39,14 +32,41 @@ import { SearchRequest, SearchRequestOptions, SearchResult } from "./search/type import { TigrisClientConfig } from "./tigris"; import { Cursor, ReadCursorInitializer } from "./cursor/cursor"; +/** + * Callback to receive events from server + */ export interface EventsCallback { + /** + * Receives a message from server. Can be called many times but is never called after + * {@link onError} or {@link onEnd} are called. + * + *

If an exception is thrown by an implementation, the caller is expected to terminate the + * stream by calling {@link onError} with the caught exception prior to propagating it. + * + * @param event + */ onNext(event: StreamEvent): void; + /** + * Receives a notification of successful stream completion. + * + *

May only be called once and if called it must be the last method called. In particular, + * if an exception is thrown by an implementation of {@link onEnd} no further calls to any + * method are allowed. + */ onEnd(): void; + /** + * Receives terminating error from the stream. + * @param error + */ onError(error: Error): void; } +/** + * The **Collection** class represents Tigris collection allowing insert/find/update/delete/search + * and events operations. + */ export class Collection { private readonly _collectionName: string; private readonly _db: string; @@ -65,12 +85,20 @@ export class Collection { this.config = config; } + /** + * Name of this collection + */ get collectionName(): string { return this._collectionName; } - // eslint-disable-next-line @typescript-eslint/no-unused-vars - insertMany(docs: Array, tx?: Session, _options?: InsertOptions): Promise> { + /** + * Inserts multiple documents in Tigris collection. + * + * @param docs - Array of documents to insert + * @param tx - Optional session information for transaction context + */ + insertMany(docs: Array, tx?: Session): Promise> { return new Promise>((resolve, reject) => { const docsArray = new Array(); for (const doc of docs) { @@ -109,11 +137,17 @@ export class Collection { }); } - insert(doc: T, tx?: Session, options?: InsertOptions): Promise { + /** + * Inserts a single document in Tigris collection. + * + * @param doc - Document to insert + * @param tx - Optional session information for transaction context + */ + insertOne(doc: T, tx?: Session): Promise { return new Promise((resolve, reject) => { const docArr: Array = new Array(); docArr.push(doc); - this.insertMany(docArr, tx, options) + this.insertMany(docArr, tx) .then((docs) => { resolve(docs[0]); }) @@ -123,11 +157,13 @@ export class Collection { }); } - insertOrReplaceMany( - docs: Array, - tx?: Session, - _options?: InsertOrReplaceOptions // eslint-disable-line @typescript-eslint/no-unused-vars - ): Promise> { + /** + * Insert new or replace existing documents in collection. + * + * @param docs - Array of documents to insert or replace + * @param tx - Optional session information for transaction context + */ + insertOrReplaceMany(docs: Array, tx?: Session): Promise> { return new Promise>((resolve, reject) => { const docsArray = new Array(); for (const doc of docs) { @@ -164,56 +200,33 @@ export class Collection { }); } - insertOrReplace(doc: T, tx?: Session, options?: InsertOptions): Promise { + /** + * Insert new or replace an existing document in collection. + * + * @param doc - Document to insert or replace + * @param tx - Optional session information for transaction context + */ + insertOrReplaceOne(doc: T, tx?: Session): Promise { return new Promise((resolve, reject) => { const docArr: Array = new Array(); docArr.push(doc); - this.insertOrReplaceMany(docArr, tx, options) + this.insertOrReplaceMany(docArr, tx) .then((docs) => resolve(docs[0])) .catch((error) => reject(error)); }); } - findOne( - filter: SelectorFilter | LogicalFilter | Selector, - tx?: Session, - readFields?: ReadFields - ): Promise { - return new Promise((resolve, reject) => { - const readRequest = new ProtoReadRequest() - .setDb(this._db) - .setCollection(this._collectionName) - .setOptions(new ProtoReadRequestOptions().setLimit(1)) - .setFilter(Utility.stringToUint8Array(Utility.filterToString(filter))); - - if (readFields) { - readRequest.setFields(Utility.stringToUint8Array(Utility.readFieldString(readFields))); - } - - const stream: grpc.ClientReadableStream = this.grpcClient.read( - readRequest, - Utility.txToMetadata(tx) - ); - - stream.on("data", (readResponse: ProtoReadResponse) => { - const doc: T = Utility.jsonStringToObj( - Utility._base64Decode(readResponse.getData_asB64()), - this.config - ); - resolve(doc); - }); - - stream.on("error", reject); - - stream.on("end", () => { - /* eslint unicorn/no-useless-undefined: ["error", {"checkArguments": false}]*/ - resolve(undefined); - }); - }); - } - + /** + * Performs a read query on collection and returns a cursor that can be used to iterate over + * query results. + * + * @param filter - Optional filter. If unspecified, then all documents will match the filter + * @param readFields - Optional field projection param allows returning only specific document fields in result + * @param tx - Optional session information for transaction context + * @param options - Optional settings for the find query + */ findMany( - filter?: SelectorFilter | LogicalFilter | Selector, + filter?: Filter, readFields?: ReadFields, tx?: Session, options?: ReadRequestOptions @@ -240,6 +253,51 @@ export class Collection { return new Cursor(initializer, this.config); } + /** + * Performs a query to find a single document in collection. Returns the document if found, else + * null. + * + * @param filter - Query to match the document + * @param readFields - Optional field projection param allows returning only specific document fields in result + * @param tx - Optional session information for transaction context + * @param options - Optional settings for the find query + */ + findOne( + filter: Filter, + readFields?: ReadFields, + tx?: Session, + options?: ReadRequestOptions + ): Promise { + return new Promise((resolve, reject) => { + if (options === undefined) { + options = new ReadRequestOptions(1); + } else { + options.limit = 1; + } + + const cursor = this.findMany(filter, readFields, tx, options); + const iteratorResult = cursor[Symbol.asyncIterator]().next(); + if (iteratorResult !== undefined) { + iteratorResult + .then( + (r) => resolve(r.value), + (error) => reject(error) + ) + .catch(reject); + } else { + /* eslint unicorn/no-useless-undefined: ["error", {"checkArguments": false}]*/ + resolve(undefined); + } + }); + } + + /** + * Search for documents in a collection. Easily perform sophisticated queries and refine + * results using filters with advanced features like faceting and ordering. + * + * @param request - Search query to execute + * @param options - Optional settings for search + */ search( request: SearchRequest, options?: SearchRequestOptions @@ -264,6 +322,13 @@ export class Collection { }); } + /** + * Search for documents in a collection. Easily perform sophisticated queries and refine + * results using filters with advanced features like faceting and ordering. + * + * @param request - Search query to execute + * @param options - Optional settings for search + */ async *searchStream( request: SearchRequest, options?: SearchRequestOptions @@ -284,12 +349,14 @@ export class Collection { return; } - delete( - filter: SelectorFilter | LogicalFilter | Selector, - tx?: Session, - // eslint-disable-next-line @typescript-eslint/no-unused-vars - _options?: DeleteRequestOptions - ): Promise { + /** + * Deletes documents in collection matching the filter + * + * @param filter - Query to match documents to delete + * @param tx - Optional session information for transaction context + * @param options - Optional settings for delete + */ + delete(filter: Filter, tx?: Session, options?: DeleteRequestOptions): Promise { return new Promise((resolve, reject) => { if (!filter) { reject(new Error("No filter specified")); @@ -299,6 +366,10 @@ export class Collection { .setCollection(this._collectionName) .setFilter(Utility.stringToUint8Array(Utility.filterToString(filter))); + if (options !== undefined) { + deleteRequest.setOptions(Utility._deleteRequestOptionsToProtoDeleteRequestOptions(options)); + } + this.grpcClient.delete(deleteRequest, Utility.txToMetadata(tx), (error, response) => { if (error) { reject(error); @@ -313,12 +384,19 @@ export class Collection { }); } + /** + * Update multiple documents in collection + * + * @param filter - Query to match documents to apply update + * @param fields - Document fields to update and update operation + * @param tx - Optional session information for transaction context + * @param options - Optional settings for search + */ update( - filter: SelectorFilter | LogicalFilter | Selector, + filter: Filter, fields: UpdateFields | SimpleUpdateField, tx?: Session, - // eslint-disable-next-line @typescript-eslint/no-unused-vars - _options?: UpdateRequestOptions + options?: UpdateRequestOptions ): Promise { return new Promise((resolve, reject) => { const updateRequest = new ProtoUpdateRequest() @@ -327,6 +405,10 @@ export class Collection { .setFilter(Utility.stringToUint8Array(Utility.filterToString(filter))) .setFields(Utility.stringToUint8Array(Utility.updateFieldsString(fields))); + if (options !== undefined) { + updateRequest.setOptions(Utility._updateRequestOptionsToProtoUpdateRequestOptions(options)); + } + this.grpcClient.update(updateRequest, Utility.txToMetadata(tx), (error, response) => { if (error) { reject(error); @@ -341,11 +423,12 @@ export class Collection { }); } - events( - events: EventsCallback, - // eslint-disable-next-line @typescript-eslint/no-unused-vars - _options?: EventsRequestOptions - ) { + /** + * Consume events from a "topic" + * + * @param callback - Callback to consume events asynchronously + */ + events(callback: EventsCallback) { const eventsRequest = new ProtoEventsRequest() .setDb(this._db) .setCollection(this._collectionName); @@ -355,7 +438,7 @@ export class Collection { stream.on("data", (eventsResponse: ProtoEventsResponse) => { const event = eventsResponse.getEvent(); - events.onNext( + callback.onNext( new StreamEvent( event.getTxId_asB64(), event.getCollection(), @@ -366,7 +449,7 @@ export class Collection { ); }); - stream.on("error", (error) => events.onError(error)); - stream.on("end", () => events.onEnd()); + stream.on("error", (error) => callback.onError(error)); + stream.on("end", () => callback.onEnd()); } } diff --git a/src/cursor/abstract-cursor.ts b/src/cursor/abstract-cursor.ts index 6de6ac2..868bc32 100644 --- a/src/cursor/abstract-cursor.ts +++ b/src/cursor/abstract-cursor.ts @@ -1,6 +1,7 @@ import * as proto from "google-protobuf"; import { ClientReadableStream } from "@grpc/grpc-js"; import { TigrisCursorInUseError } from "../error"; +import { Readable } from "node:stream"; /** @internal */ export interface Initializer { @@ -40,8 +41,17 @@ export abstract class AbstractCursor { this[tClosed] = true; } + /** @internal */ + private async *next(): AsyncIterableIterator { + this._assertNotInUse(); + for await (const message of this[tStream]) { + yield this._transform(message); + } + return; + } + /** - * Returns a stream of documents to iterate on + * Returns a {@link Readable} stream of documents to iterate on * * Usage: * const cursor = myCollection.find(); @@ -52,12 +62,8 @@ export abstract class AbstractCursor { * @throws {@link TigrisCursorInUseError} - if cursor is being consumed or has been consumed. * @see {@link reset()} to re-use a cursor. */ - async *stream(): AsyncIterableIterator { - this._assertNotInUse(); - for await (const message of this[tStream]) { - yield this._transform(message); - } - return; + stream(): Readable { + return Readable.from(this.next()); } /** @@ -73,7 +79,7 @@ export abstract class AbstractCursor { * @see {@link reset()} to re-use a cursor. */ [Symbol.asyncIterator](): AsyncIterableIterator { - return this.stream()[Symbol.asyncIterator](); + return this.next()[Symbol.asyncIterator](); } /** diff --git a/src/search/types.ts b/src/search/types.ts index a526c56..b2944ea 100644 --- a/src/search/types.ts +++ b/src/search/types.ts @@ -1,4 +1,4 @@ -import { LogicalFilter, Selector, SelectorFilter, TigrisCollectionType } from "../types"; +import { Filter, TigrisCollectionType } from "../types"; import { FacetCount as ProtoFacetCount, FacetStats as ProtoFacetStats, @@ -29,7 +29,7 @@ export type SearchRequest = { /** * Filter to further refine the search results */ - filter?: SelectorFilter | LogicalFilter | Selector; + filter?: Filter; /** * Facet fields to categorically arrange indexed terms */ diff --git a/src/types.ts b/src/types.ts index 15581f2..7ff588a 100644 --- a/src/types.ts +++ b/src/types.ts @@ -189,7 +189,37 @@ export class UpdateResponse extends DMLResponse { export class WriteOptions {} -export class DeleteRequestOptions {} +export class DeleteRequestOptions { + private _collation: Collation; + + constructor(collation: Collation) { + this._collation = collation; + } + + get collation(): Collation { + return this._collation; + } + + set collation(value: Collation) { + this._collation = value; + } +} + +export class UpdateRequestOptions { + private _collation: Collation; + + constructor(collation: Collation) { + this._collation = collation; + } + + get collation(): Collation { + return this._collation; + } + + set collation(value: Collation) { + this._collation = value; + } +} export class ReadRequestOptions { static DEFAULT_LIMIT = 100; @@ -243,12 +273,8 @@ export class ReadRequestOptions { } } -export class UpdateRequestOptions {} - export class TransactionOptions {} -export class EventsRequestOptions {} - export class StreamEvent { private readonly _txId: string; private readonly _collection: string; @@ -303,8 +329,6 @@ export class TransactionResponse extends TigrisResponse { } } -export class InsertOptions {} - export class PublishOptions { private _partition: number; @@ -337,8 +361,6 @@ export class SubscribeOptions { } } -export class InsertOrReplaceOptions {} - export class ServerMetadata { private readonly _serverVersion: string; @@ -494,3 +516,5 @@ export type SelectorFilter = Partial<{ op?: SelectorFilterOperator; fields: Selector; }>; + +export type Filter = SelectorFilter | LogicalFilter | Selector; diff --git a/src/utility.ts b/src/utility.ts index 8ce3933..96406b7 100644 --- a/src/utility.ts +++ b/src/utility.ts @@ -4,6 +4,7 @@ import { Session } from "./session"; import { CollectionType, + DeleteRequestOptions, LogicalFilter, LogicalOperator, ReadFields, @@ -17,6 +18,7 @@ import { TigrisSchema, UpdateFields, UpdateFieldsOperator, + UpdateRequestOptions, } from "./types"; import * as fs from "node:fs"; import { @@ -31,8 +33,10 @@ import { } from "./search/types"; import { Collation as ProtoCollation, + DeleteRequestOptions as ProtoDeleteRequestOptions, ReadRequestOptions as ProtoReadRequestOptions, SearchRequest as ProtoSearchRequest, + UpdateRequestOptions as ProtoUpdateRequestOptions, } from "./proto/server/v1/api_pb"; import { TigrisClientConfig } from "./tigris"; @@ -334,6 +338,24 @@ export const Utility = { } return result; }, + _deleteRequestOptionsToProtoDeleteRequestOptions( + input: DeleteRequestOptions + ): ProtoDeleteRequestOptions { + const result: ProtoDeleteRequestOptions = new ProtoDeleteRequestOptions(); + if (input !== undefined && input.collation !== undefined) { + result.setCollation(new ProtoCollation().setCase(input.collation.case)); + } + return result; + }, + _updateRequestOptionsToProtoUpdateRequestOptions( + input: UpdateRequestOptions + ): ProtoUpdateRequestOptions { + const result: ProtoUpdateRequestOptions = new ProtoUpdateRequestOptions(); + if (input !== undefined && input.collation !== undefined) { + result.setCollation(new ProtoCollation().setCase(input.collation.case)); + } + return result; + }, _getArrayBlock(arraySchema: TigrisSchema | TigrisDataTypes, pkeyMap: object): object { const arrayBlock = {}; arrayBlock["type"] = "array";