Skip to content

Commit

Permalink
result reader
Browse files Browse the repository at this point in the history
  • Loading branch information
jraymakers committed Nov 4, 2024
1 parent 5f86ef1 commit 7626bc7
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 2 deletions.
14 changes: 14 additions & 0 deletions api/src/DuckDBConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { DuckDBExtractedStatements } from './DuckDBExtractedStatements';
import { DuckDBInstance } from './DuckDBInstance';
import { DuckDBPreparedStatement } from './DuckDBPreparedStatement';
import { DuckDBResult } from './DuckDBResult';
import { DuckDBResultReader } from './DuckDBResultReader';

export class DuckDBConnection {
private readonly connection: duckdb.Connection;
Expand All @@ -24,6 +25,19 @@ export class DuckDBConnection {
public async run(sql: string): Promise<DuckDBResult> {
return new DuckDBResult(await duckdb.query(this.connection, sql));
}
public async runAndRead(sql: string): Promise<DuckDBResultReader> {
return new DuckDBResultReader(await this.run(sql));
}
public async runAndReadAll(sql: string): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.run(sql));
await reader.readAll();
return reader;
}
public async runAndReadUntil(sql: string, targetRowCount: number): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.run(sql));
await reader.readUntil(targetRowCount);
return reader;
}
public async prepare(sql: string): Promise<DuckDBPreparedStatement> {
return new DuckDBPreparedStatement(
await duckdb.prepare(this.connection, sql)
Expand Down
9 changes: 7 additions & 2 deletions api/src/DuckDBDataChunk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { DuckDBValue } from './values';

export class DuckDBDataChunk {
public readonly chunk: duckdb.DataChunk;
private readonly vectors: DuckDBVector[] = [];
constructor(chunk: duckdb.DataChunk) {
this.chunk = chunk;
}
Expand All @@ -17,11 +18,15 @@ export class DuckDBDataChunk {
return duckdb.data_chunk_get_column_count(this.chunk);
}
public getColumnVector(columnIndex: number): DuckDBVector {
// TODO: cache vectors?
return DuckDBVector.create(
if (this.vectors[columnIndex]) {
return this.vectors[columnIndex];
}
const vector = DuckDBVector.create(
duckdb.data_chunk_get_vector(this.chunk, columnIndex),
this.rowCount
);
this.vectors[columnIndex] = vector;
return vector;
}
public getColumnValues(columnIndex: number): DuckDBValue[] {
return this.getColumnVector(columnIndex).toArray();
Expand Down
14 changes: 14 additions & 0 deletions api/src/DuckDBPendingResult.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import duckdb from '@duckdb/node-bindings';
import { DuckDBResult } from './DuckDBResult';
import { DuckDBResultReader } from './DuckDBResultReader';

// Values match similar enum in C API.
export enum DuckDBPendingResultState {
Expand Down Expand Up @@ -35,4 +36,17 @@ export class DuckDBPendingResult {
public async getResult(): Promise<DuckDBResult> {
return new DuckDBResult(await duckdb.execute_pending(this.pending_result));
}
public async read(): Promise<DuckDBResultReader> {
return new DuckDBResultReader(await this.getResult());
}
public async readAll(): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.getResult());
await reader.readAll();
return reader;
}
public async readUntil(targetRowCount: number): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.getResult());
await reader.readUntil(targetRowCount);
return reader;
}
}
14 changes: 14 additions & 0 deletions api/src/DuckDBPreparedStatement.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import duckdb from '@duckdb/node-bindings';
import { DuckDBPendingResult } from './DuckDBPendingResult';
import { DuckDBResult } from './DuckDBResult';
import { DuckDBResultReader } from './DuckDBResultReader';
import { DuckDBTypeId } from './DuckDBTypeId';
import { StatementType } from './enums';
import {
Expand Down Expand Up @@ -115,6 +116,19 @@ export class DuckDBPreparedStatement {
await duckdb.execute_prepared(this.prepared_statement)
);
}
public async runAndRead(): Promise<DuckDBResultReader> {
return new DuckDBResultReader(await this.run());
}
public async runAndReadAll(): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.run());
await reader.readAll();
return reader;
}
public async runAndReadUntil(targetRowCount: number): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.run());
await reader.readUntil(targetRowCount);
return reader;
}
public start(): DuckDBPendingResult {
return new DuckDBPendingResult(
duckdb.pending_prepared(this.prepared_statement)
Expand Down
43 changes: 43 additions & 0 deletions api/src/DuckDBResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import { DuckDBDataChunk } from './DuckDBDataChunk';
import { DuckDBLogicalType } from './DuckDBLogicalType';
import { DuckDBType } from './DuckDBType';
import { DuckDBTypeId } from './DuckDBTypeId';
import { DuckDBVector } from './DuckDBVector';
import { ResultReturnType, StatementType } from './enums';
import { DuckDBValue } from './values';

export class DuckDBResult {
private readonly result: duckdb.Result;
Expand Down Expand Up @@ -70,4 +72,45 @@ export class DuckDBResult {
chunks.push(chunk);
}
}
public async getColumns(): Promise<DuckDBValue[][]> {
const chunks = await this.fetchAllChunks();
if (chunks.length === 0) {
return [];
}
const firstChunk = chunks[0];
const columns: DuckDBValue[][] = [];
const columnCount = this.columnCount;
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
columns.push(firstChunk.getColumnValues(columnIndex));
}
for (let chunkIndex = 1; chunkIndex < chunks.length; chunkIndex++) {
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
const vector = chunks[chunkIndex].getColumnVector(columnIndex);
for (let itemIndex = 0; itemIndex < vector.itemCount; itemIndex++) {
columns[columnIndex].push(vector.getItem(itemIndex));
}
}
}
return columns;
}
public async getRows(): Promise<DuckDBValue[][]> {
const chunks = await this.fetchAllChunks();
const rows: DuckDBValue[][] = [];
for (const chunk of chunks) {
const chunkVectors: DuckDBVector[] = [];
const columnCount = chunk.columnCount;
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
chunkVectors.push(chunk.getColumnVector(columnIndex));
}
const rowCount = chunk.rowCount;
for (let rowIndex = 0; rowIndex < rowCount; rowIndex++) {
const row: DuckDBValue[] = [];
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
row.push(chunkVectors[columnIndex].getItem(rowIndex));
}
rows.push(row);
}
}
return rows;
}
}
196 changes: 196 additions & 0 deletions api/src/DuckDBResultReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import { DuckDBDataChunk } from './DuckDBDataChunk';
import { DuckDBLogicalType } from './DuckDBLogicalType';
import { DuckDBResult } from './DuckDBResult';
import { DuckDBType } from './DuckDBType';
import { DuckDBTypeId } from './DuckDBTypeId';
import { DuckDBVector } from './DuckDBVector';
import { ResultReturnType, StatementType } from './enums';
import { DuckDBValue } from './values';

interface ChunkSizeRun {
chunkCount: number;
chunkSize: number;
rowCount: number; // Equal to chunkCount * chunkSize; precalculated for efficiency.
}

export class DuckDBResultReader {
private readonly result: DuckDBResult;
private readonly chunks: DuckDBDataChunk[];
private readonly chunkSizeRuns: ChunkSizeRun[];
private currentRowCount_: number;
private done_: boolean;

constructor(result: DuckDBResult) {
this.result = result;
this.chunks = [];
this.chunkSizeRuns = [];
this.currentRowCount_ = 0;
this.done_ = false;
}

public get returnType(): ResultReturnType {
return this.result.returnType;
}
public get statementType(): StatementType {
return this.result.statementType;
}
public get columnCount(): number {
return this.result.columnCount;
}
public columnName(columnIndex: number): string {
return this.result.columnName(columnIndex);
}
public columnNames(): string[] {
return this.result.columnNames();
}
public columnTypeId(columnIndex: number): DuckDBTypeId {
return this.result.columnTypeId(columnIndex);
}
public columnLogicalType(columnIndex: number): DuckDBLogicalType {
return this.result.columnLogicalType(columnIndex);
}
public columnType(columnIndex: number): DuckDBType {
return this.result.columnType(columnIndex);
}
public columnTypes(): DuckDBType[] {
return this.result.columnTypes();
}
public get rowsChanged(): number {
return this.result.rowsChanged;
}

/** Total number of rows read so far. Call `readAll` or `readUntil` to read rows. */
public get currentRowCount() {
return this.currentRowCount_;
}

/** Whether reading is done, that is, there are no more rows to read. */
public get done() {
return this.done_;
}

/**
* Returns the value for the given column and row. Both are zero-indexed.
*
* Will return an error if `rowIndex` is greater than `currentRowCount`.
*/
public value(columnIndex: number, rowIndex: number): DuckDBValue {
if (this.currentRowCount_ === 0) {
throw Error(`No rows have been read`);
}
let chunkIndex = 0;
let currentRowIndex = rowIndex;
// Find which run of chunks our row is in.
// Since chunkSizeRuns shouldn't ever be longer than 2, this should be O(1).
for (const run of this.chunkSizeRuns) {
if (currentRowIndex < run.rowCount) {
// The row we're looking for is in this run.
// Calculate the chunk index and the row index in that chunk.
chunkIndex += Math.floor(currentRowIndex / run.chunkSize);
const rowIndexInChunk = currentRowIndex % run.chunkSize;
const chunk = this.chunks[chunkIndex];
return chunk.getColumnVector(columnIndex).getItem(rowIndexInChunk);
}
// The row we're looking for is not in this run.
// Update our counts for this run and move to the next one.
chunkIndex += run.chunkCount;
currentRowIndex -= run.rowCount;
}
// We didn't find our row. It must have been out of range.
throw Error(
`Row index ${rowIndex} requested, but only ${this.currentRowCount_} row have been read so far.`,
);
}

/** Read all rows. */
public async readAll(): Promise<void> {
return this.fetchChunks();
}

/**
* Read rows until at least the given target row count has been met.
*
* Note that the resulting row count could be greater than the target, since rows are read in chunks, typically of 2048 rows each.
*/
public async readUntil(targetRowCount: number): Promise<void> {
return this.fetchChunks(targetRowCount);
}

private async fetchChunks(targetRowCount?: number): Promise<void> {
while (
!(
this.done_ ||
(targetRowCount !== undefined && this.currentRowCount_ >= targetRowCount)
)
) {
const chunk = await this.result.fetchChunk();
if (chunk.rowCount > 0) {
this.updateChunkSizeRuns(chunk);
this.chunks.push(chunk);
this.currentRowCount_ += chunk.rowCount;
} else {
this.done_ = true;
}
}
}

private updateChunkSizeRuns(chunk: DuckDBDataChunk) {
if (this.chunkSizeRuns.length > 0) {
const lastRun = this.chunkSizeRuns[this.chunkSizeRuns.length - 1];
if (lastRun.chunkSize === chunk.rowCount) {
// If the new batch is the same size as the last one, just update our last run.
lastRun.chunkCount += 1;
lastRun.rowCount += lastRun.chunkSize;
return;
}
}
// If this is our first batch, or it's a different size, create a new run.
this.chunkSizeRuns.push({
chunkCount: 1,
chunkSize: chunk.rowCount,
rowCount: chunk.rowCount,
});
}

public getColumns(): DuckDBValue[][] {
if (this.chunks.length === 0) {
return [];
}
const firstChunk = this.chunks[0];
const columns: DuckDBValue[][] = [];
const columnCount = this.columnCount;
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
columns.push(firstChunk.getColumnValues(columnIndex));
}
for (let chunkIndex = 1; chunkIndex < this.chunks.length; chunkIndex++) {
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
const vector = this.chunks[chunkIndex].getColumnVector(columnIndex);
for (let itemIndex = 0; itemIndex < vector.itemCount; itemIndex++) {
columns[columnIndex].push(vector.getItem(itemIndex));
}
}
}
return columns;
}

public getRows(): DuckDBValue[][] {
const rows: DuckDBValue[][] = [];
for (const chunk of this.chunks) {
const chunkVectors: DuckDBVector[] = [];
const columnCount = chunk.columnCount;
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
chunkVectors.push(chunk.getColumnVector(columnIndex));
}
const rowCount = chunk.rowCount;
for (let rowIndex = 0; rowIndex < rowCount; rowIndex++) {
const row: DuckDBValue[] = [];
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
row.push(chunkVectors[columnIndex].getItem(rowIndex));
}
rows.push(row);
}
}
return rows;
}

}
17 changes: 17 additions & 0 deletions api/test/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -957,4 +957,21 @@ describe('api', () => {
assert.deepEqual(chunkRows, [[[0, 10], [1, 11], [2, 12]]]);
});
});
test('result reader', async () => {
await withConnection(async (connection) => {
const reader = await connection.runAndReadAll('select i::int as a, i::int + 10000 as b from range(5000) t(i)');
assert.deepEqual(reader.columnNames(), ['a', 'b']);
assert.deepEqual(reader.columnTypes(), [DuckDBIntegerType.instance, DuckDBIntegerType.instance]);
const columns = reader.getColumns();
assert.equal(columns.length, 2);
assert.equal(columns[0][0], 0);
assert.equal(columns[0][4999], 4999);
assert.equal(columns[1][0], 10000);
assert.equal(columns[1][4999], 14999);
const rows = reader.getRows();
assert.equal(rows.length, 5000);
assert.deepEqual(rows[0], [0, 10000]);
assert.deepEqual(rows[4999], [4999, 14999]);
});
});
});

0 comments on commit 7626bc7

Please sign in to comment.