diff --git a/api/pkgs/@duckdb/node-api/README.md b/api/pkgs/@duckdb/node-api/README.md index 93d8b49..4966e85 100644 --- a/api/pkgs/@duckdb/node-api/README.md +++ b/api/pkgs/@duckdb/node-api/README.md @@ -146,6 +146,36 @@ for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) { } ``` +### Result Reader + +Run and read all data: +```ts +const reader = await connection.runAndReadAll('from test_all_types()'); +const rows = reader.getRows(); +// OR: const columns = reader.getColumns(); +``` + +Run and read up to (at lesat) some number of rows: +```ts +const reader = await connection.runAndReadUtil('from range(5000)', 1000); +const rows = reader.getRows(); +// rows.length === 2048. (Rows are read in chunks of 2048.) +``` + +Read rows incrementally: +```ts +const reader = await connection.runAndRead('from range(5000)'); +reader.readUntil(2000); +// reader.currentRowCount === 2048 (Rows are read in chunks of 2048.) +// reader.done === false +reader.readUntil(4000); +// reader.currentRowCount === 4096 +// reader.done === false +reader.readUntil(6000); +// reader.currentRowCount === 5000 +// reader.done === true +``` + ### Inspect Data Types ```ts diff --git a/api/src/DuckDBConnection.ts b/api/src/DuckDBConnection.ts index 6566355..1bc2866 100644 --- a/api/src/DuckDBConnection.ts +++ b/api/src/DuckDBConnection.ts @@ -4,6 +4,7 @@ import { DuckDBExtractedStatements } from './DuckDBExtractedStatements'; import { DuckDBInstance } from './DuckDBInstance'; import { DuckDBPreparedStatement } from './DuckDBPreparedStatement'; import { DuckDBResult } from './DuckDBResult'; +import { DuckDBResultReader } from './DuckDBResultReader'; export class DuckDBConnection { private readonly connection: duckdb.Connection; @@ -24,6 +25,19 @@ export class DuckDBConnection { public async run(sql: string): Promise { return new DuckDBResult(await duckdb.query(this.connection, sql)); } + public async runAndRead(sql: string): Promise { + return new DuckDBResultReader(await this.run(sql)); + } + public async runAndReadAll(sql: string): Promise { + const reader = new DuckDBResultReader(await this.run(sql)); + await reader.readAll(); + return reader; + } + public async runAndReadUntil(sql: string, targetRowCount: number): Promise { + const reader = new DuckDBResultReader(await this.run(sql)); + await reader.readUntil(targetRowCount); + return reader; + } public async prepare(sql: string): Promise { return new DuckDBPreparedStatement( await duckdb.prepare(this.connection, sql) diff --git a/api/src/DuckDBDataChunk.ts b/api/src/DuckDBDataChunk.ts index 2e0e6c9..0d35cf4 100644 --- a/api/src/DuckDBDataChunk.ts +++ b/api/src/DuckDBDataChunk.ts @@ -4,6 +4,7 @@ import { DuckDBValue } from './values'; export class DuckDBDataChunk { public readonly chunk: duckdb.DataChunk; + private readonly vectors: DuckDBVector[] = []; constructor(chunk: duckdb.DataChunk) { this.chunk = chunk; } @@ -17,11 +18,15 @@ export class DuckDBDataChunk { return duckdb.data_chunk_get_column_count(this.chunk); } public getColumnVector(columnIndex: number): DuckDBVector { - // TODO: cache vectors? - return DuckDBVector.create( + if (this.vectors[columnIndex]) { + return this.vectors[columnIndex]; + } + const vector = DuckDBVector.create( duckdb.data_chunk_get_vector(this.chunk, columnIndex), this.rowCount ); + this.vectors[columnIndex] = vector; + return vector; } public getColumnValues(columnIndex: number): DuckDBValue[] { return this.getColumnVector(columnIndex).toArray(); diff --git a/api/src/DuckDBPendingResult.ts b/api/src/DuckDBPendingResult.ts index 34bccc3..48627c3 100644 --- a/api/src/DuckDBPendingResult.ts +++ b/api/src/DuckDBPendingResult.ts @@ -1,5 +1,6 @@ import duckdb from '@duckdb/node-bindings'; import { DuckDBResult } from './DuckDBResult'; +import { DuckDBResultReader } from './DuckDBResultReader'; // Values match similar enum in C API. export enum DuckDBPendingResultState { @@ -35,4 +36,17 @@ export class DuckDBPendingResult { public async getResult(): Promise { return new DuckDBResult(await duckdb.execute_pending(this.pending_result)); } + public async read(): Promise { + return new DuckDBResultReader(await this.getResult()); + } + public async readAll(): Promise { + const reader = new DuckDBResultReader(await this.getResult()); + await reader.readAll(); + return reader; + } + public async readUntil(targetRowCount: number): Promise { + const reader = new DuckDBResultReader(await this.getResult()); + await reader.readUntil(targetRowCount); + return reader; + } } diff --git a/api/src/DuckDBPreparedStatement.ts b/api/src/DuckDBPreparedStatement.ts index 8586dcf..7f14253 100644 --- a/api/src/DuckDBPreparedStatement.ts +++ b/api/src/DuckDBPreparedStatement.ts @@ -1,6 +1,7 @@ import duckdb from '@duckdb/node-bindings'; import { DuckDBPendingResult } from './DuckDBPendingResult'; import { DuckDBResult } from './DuckDBResult'; +import { DuckDBResultReader } from './DuckDBResultReader'; import { DuckDBTypeId } from './DuckDBTypeId'; import { StatementType } from './enums'; import { @@ -115,6 +116,19 @@ export class DuckDBPreparedStatement { await duckdb.execute_prepared(this.prepared_statement) ); } + public async runAndRead(): Promise { + return new DuckDBResultReader(await this.run()); + } + public async runAndReadAll(): Promise { + const reader = new DuckDBResultReader(await this.run()); + await reader.readAll(); + return reader; + } + public async runAndReadUntil(targetRowCount: number): Promise { + const reader = new DuckDBResultReader(await this.run()); + await reader.readUntil(targetRowCount); + return reader; + } public start(): DuckDBPendingResult { return new DuckDBPendingResult( duckdb.pending_prepared(this.prepared_statement) diff --git a/api/src/DuckDBResult.ts b/api/src/DuckDBResult.ts index 72a73dc..86f6503 100644 --- a/api/src/DuckDBResult.ts +++ b/api/src/DuckDBResult.ts @@ -3,7 +3,9 @@ import { DuckDBDataChunk } from './DuckDBDataChunk'; import { DuckDBLogicalType } from './DuckDBLogicalType'; import { DuckDBType } from './DuckDBType'; import { DuckDBTypeId } from './DuckDBTypeId'; +import { DuckDBVector } from './DuckDBVector'; import { ResultReturnType, StatementType } from './enums'; +import { DuckDBValue } from './values'; export class DuckDBResult { private readonly result: duckdb.Result; @@ -70,4 +72,45 @@ export class DuckDBResult { chunks.push(chunk); } } + public async getColumns(): Promise { + const chunks = await this.fetchAllChunks(); + if (chunks.length === 0) { + return []; + } + const firstChunk = chunks[0]; + const columns: DuckDBValue[][] = []; + const columnCount = this.columnCount; + for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) { + columns.push(firstChunk.getColumnValues(columnIndex)); + } + for (let chunkIndex = 1; chunkIndex < chunks.length; chunkIndex++) { + for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) { + const vector = chunks[chunkIndex].getColumnVector(columnIndex); + for (let itemIndex = 0; itemIndex < vector.itemCount; itemIndex++) { + columns[columnIndex].push(vector.getItem(itemIndex)); + } + } + } + return columns; + } + public async getRows(): Promise { + const chunks = await this.fetchAllChunks(); + const rows: DuckDBValue[][] = []; + for (const chunk of chunks) { + const chunkVectors: DuckDBVector[] = []; + const columnCount = chunk.columnCount; + for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) { + chunkVectors.push(chunk.getColumnVector(columnIndex)); + } + const rowCount = chunk.rowCount; + for (let rowIndex = 0; rowIndex < rowCount; rowIndex++) { + const row: DuckDBValue[] = []; + for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) { + row.push(chunkVectors[columnIndex].getItem(rowIndex)); + } + rows.push(row); + } + } + return rows; + } } diff --git a/api/src/DuckDBResultReader.ts b/api/src/DuckDBResultReader.ts new file mode 100644 index 0000000..20ec5c2 --- /dev/null +++ b/api/src/DuckDBResultReader.ts @@ -0,0 +1,196 @@ +import { DuckDBDataChunk } from './DuckDBDataChunk'; +import { DuckDBLogicalType } from './DuckDBLogicalType'; +import { DuckDBResult } from './DuckDBResult'; +import { DuckDBType } from './DuckDBType'; +import { DuckDBTypeId } from './DuckDBTypeId'; +import { DuckDBVector } from './DuckDBVector'; +import { ResultReturnType, StatementType } from './enums'; +import { DuckDBValue } from './values'; + +interface ChunkSizeRun { + chunkCount: number; + chunkSize: number; + rowCount: number; // Equal to chunkCount * chunkSize; precalculated for efficiency. +} + +export class DuckDBResultReader { + private readonly result: DuckDBResult; + private readonly chunks: DuckDBDataChunk[]; + private readonly chunkSizeRuns: ChunkSizeRun[]; + private currentRowCount_: number; + private done_: boolean; + + constructor(result: DuckDBResult) { + this.result = result; + this.chunks = []; + this.chunkSizeRuns = []; + this.currentRowCount_ = 0; + this.done_ = false; + } + + public get returnType(): ResultReturnType { + return this.result.returnType; + } + public get statementType(): StatementType { + return this.result.statementType; + } + public get columnCount(): number { + return this.result.columnCount; + } + public columnName(columnIndex: number): string { + return this.result.columnName(columnIndex); + } + public columnNames(): string[] { + return this.result.columnNames(); + } + public columnTypeId(columnIndex: number): DuckDBTypeId { + return this.result.columnTypeId(columnIndex); + } + public columnLogicalType(columnIndex: number): DuckDBLogicalType { + return this.result.columnLogicalType(columnIndex); + } + public columnType(columnIndex: number): DuckDBType { + return this.result.columnType(columnIndex); + } + public columnTypes(): DuckDBType[] { + return this.result.columnTypes(); + } + public get rowsChanged(): number { + return this.result.rowsChanged; + } + + /** Total number of rows read so far. Call `readAll` or `readUntil` to read rows. */ + public get currentRowCount() { + return this.currentRowCount_; + } + + /** Whether reading is done, that is, there are no more rows to read. */ + public get done() { + return this.done_; + } + + /** + * Returns the value for the given column and row. Both are zero-indexed. + * + * Will return an error if `rowIndex` is greater than `currentRowCount`. + */ + public value(columnIndex: number, rowIndex: number): DuckDBValue { + if (this.currentRowCount_ === 0) { + throw Error(`No rows have been read`); + } + let chunkIndex = 0; + let currentRowIndex = rowIndex; + // Find which run of chunks our row is in. + // Since chunkSizeRuns shouldn't ever be longer than 2, this should be O(1). + for (const run of this.chunkSizeRuns) { + if (currentRowIndex < run.rowCount) { + // The row we're looking for is in this run. + // Calculate the chunk index and the row index in that chunk. + chunkIndex += Math.floor(currentRowIndex / run.chunkSize); + const rowIndexInChunk = currentRowIndex % run.chunkSize; + const chunk = this.chunks[chunkIndex]; + return chunk.getColumnVector(columnIndex).getItem(rowIndexInChunk); + } + // The row we're looking for is not in this run. + // Update our counts for this run and move to the next one. + chunkIndex += run.chunkCount; + currentRowIndex -= run.rowCount; + } + // We didn't find our row. It must have been out of range. + throw Error( + `Row index ${rowIndex} requested, but only ${this.currentRowCount_} row have been read so far.`, + ); + } + + /** Read all rows. */ + public async readAll(): Promise { + return this.fetchChunks(); + } + + /** + * Read rows until at least the given target row count has been met. + * + * Note that the resulting row count could be greater than the target, since rows are read in chunks, typically of 2048 rows each. + */ + public async readUntil(targetRowCount: number): Promise { + return this.fetchChunks(targetRowCount); + } + + private async fetchChunks(targetRowCount?: number): Promise { + while ( + !( + this.done_ || + (targetRowCount !== undefined && this.currentRowCount_ >= targetRowCount) + ) + ) { + const chunk = await this.result.fetchChunk(); + if (chunk.rowCount > 0) { + this.updateChunkSizeRuns(chunk); + this.chunks.push(chunk); + this.currentRowCount_ += chunk.rowCount; + } else { + this.done_ = true; + } + } + } + + private updateChunkSizeRuns(chunk: DuckDBDataChunk) { + if (this.chunkSizeRuns.length > 0) { + const lastRun = this.chunkSizeRuns[this.chunkSizeRuns.length - 1]; + if (lastRun.chunkSize === chunk.rowCount) { + // If the new batch is the same size as the last one, just update our last run. + lastRun.chunkCount += 1; + lastRun.rowCount += lastRun.chunkSize; + return; + } + } + // If this is our first batch, or it's a different size, create a new run. + this.chunkSizeRuns.push({ + chunkCount: 1, + chunkSize: chunk.rowCount, + rowCount: chunk.rowCount, + }); + } + + public getColumns(): DuckDBValue[][] { + if (this.chunks.length === 0) { + return []; + } + const firstChunk = this.chunks[0]; + const columns: DuckDBValue[][] = []; + const columnCount = this.columnCount; + for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) { + columns.push(firstChunk.getColumnValues(columnIndex)); + } + for (let chunkIndex = 1; chunkIndex < this.chunks.length; chunkIndex++) { + for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) { + const vector = this.chunks[chunkIndex].getColumnVector(columnIndex); + for (let itemIndex = 0; itemIndex < vector.itemCount; itemIndex++) { + columns[columnIndex].push(vector.getItem(itemIndex)); + } + } + } + return columns; + } + + public getRows(): DuckDBValue[][] { + const rows: DuckDBValue[][] = []; + for (const chunk of this.chunks) { + const chunkVectors: DuckDBVector[] = []; + const columnCount = chunk.columnCount; + for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) { + chunkVectors.push(chunk.getColumnVector(columnIndex)); + } + const rowCount = chunk.rowCount; + for (let rowIndex = 0; rowIndex < rowCount; rowIndex++) { + const row: DuckDBValue[] = []; + for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) { + row.push(chunkVectors[columnIndex].getItem(rowIndex)); + } + rows.push(row); + } + } + return rows; + } + +} diff --git a/api/test/api.test.ts b/api/test/api.test.ts index 599ac10..cf1b729 100644 --- a/api/test/api.test.ts +++ b/api/test/api.test.ts @@ -957,4 +957,21 @@ describe('api', () => { assert.deepEqual(chunkRows, [[[0, 10], [1, 11], [2, 12]]]); }); }); + test('result reader', async () => { + await withConnection(async (connection) => { + const reader = await connection.runAndReadAll('select i::int as a, i::int + 10000 as b from range(5000) t(i)'); + assert.deepEqual(reader.columnNames(), ['a', 'b']); + assert.deepEqual(reader.columnTypes(), [DuckDBIntegerType.instance, DuckDBIntegerType.instance]); + const columns = reader.getColumns(); + assert.equal(columns.length, 2); + assert.equal(columns[0][0], 0); + assert.equal(columns[0][4999], 4999); + assert.equal(columns[1][0], 10000); + assert.equal(columns[1][4999], 14999); + const rows = reader.getRows(); + assert.equal(rows.length, 5000); + assert.deepEqual(rows[0], [0, 10000]); + assert.deepEqual(rows[4999], [4999, 14999]); + }); + }); });