Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

result reader #50

Merged
merged 2 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions api/pkgs/@duckdb/node-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,36 @@ for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
}
```

### Result Reader

Run and read all data:
```ts
const reader = await connection.runAndReadAll('from test_all_types()');
const rows = reader.getRows();
// OR: const columns = reader.getColumns();
```

Run and read up to (at lesat) some number of rows:
```ts
const reader = await connection.runAndReadUtil('from range(5000)', 1000);
const rows = reader.getRows();
// rows.length === 2048. (Rows are read in chunks of 2048.)
```

Read rows incrementally:
```ts
const reader = await connection.runAndRead('from range(5000)');
reader.readUntil(2000);
// reader.currentRowCount === 2048 (Rows are read in chunks of 2048.)
// reader.done === false
reader.readUntil(4000);
// reader.currentRowCount === 4096
// reader.done === false
reader.readUntil(6000);
// reader.currentRowCount === 5000
// reader.done === true
```

### Inspect Data Types

```ts
Expand Down
14 changes: 14 additions & 0 deletions api/src/DuckDBConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { DuckDBExtractedStatements } from './DuckDBExtractedStatements';
import { DuckDBInstance } from './DuckDBInstance';
import { DuckDBPreparedStatement } from './DuckDBPreparedStatement';
import { DuckDBResult } from './DuckDBResult';
import { DuckDBResultReader } from './DuckDBResultReader';

export class DuckDBConnection {
private readonly connection: duckdb.Connection;
Expand All @@ -24,6 +25,19 @@ export class DuckDBConnection {
public async run(sql: string): Promise<DuckDBResult> {
return new DuckDBResult(await duckdb.query(this.connection, sql));
}
public async runAndRead(sql: string): Promise<DuckDBResultReader> {
return new DuckDBResultReader(await this.run(sql));
}
public async runAndReadAll(sql: string): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.run(sql));
await reader.readAll();
return reader;
}
public async runAndReadUntil(sql: string, targetRowCount: number): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.run(sql));
await reader.readUntil(targetRowCount);
return reader;
}
public async prepare(sql: string): Promise<DuckDBPreparedStatement> {
return new DuckDBPreparedStatement(
await duckdb.prepare(this.connection, sql)
Expand Down
9 changes: 7 additions & 2 deletions api/src/DuckDBDataChunk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { DuckDBValue } from './values';

export class DuckDBDataChunk {
public readonly chunk: duckdb.DataChunk;
private readonly vectors: DuckDBVector[] = [];
constructor(chunk: duckdb.DataChunk) {
this.chunk = chunk;
}
Expand All @@ -17,11 +18,15 @@ export class DuckDBDataChunk {
return duckdb.data_chunk_get_column_count(this.chunk);
}
public getColumnVector(columnIndex: number): DuckDBVector {
// TODO: cache vectors?
return DuckDBVector.create(
if (this.vectors[columnIndex]) {
return this.vectors[columnIndex];
}
const vector = DuckDBVector.create(
duckdb.data_chunk_get_vector(this.chunk, columnIndex),
this.rowCount
);
this.vectors[columnIndex] = vector;
return vector;
}
public getColumnValues(columnIndex: number): DuckDBValue[] {
return this.getColumnVector(columnIndex).toArray();
Expand Down
14 changes: 14 additions & 0 deletions api/src/DuckDBPendingResult.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import duckdb from '@duckdb/node-bindings';
import { DuckDBResult } from './DuckDBResult';
import { DuckDBResultReader } from './DuckDBResultReader';

// Values match similar enum in C API.
export enum DuckDBPendingResultState {
Expand Down Expand Up @@ -35,4 +36,17 @@ export class DuckDBPendingResult {
public async getResult(): Promise<DuckDBResult> {
return new DuckDBResult(await duckdb.execute_pending(this.pending_result));
}
public async read(): Promise<DuckDBResultReader> {
return new DuckDBResultReader(await this.getResult());
}
public async readAll(): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.getResult());
await reader.readAll();
return reader;
}
public async readUntil(targetRowCount: number): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.getResult());
await reader.readUntil(targetRowCount);
return reader;
}
}
14 changes: 14 additions & 0 deletions api/src/DuckDBPreparedStatement.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import duckdb from '@duckdb/node-bindings';
import { DuckDBPendingResult } from './DuckDBPendingResult';
import { DuckDBResult } from './DuckDBResult';
import { DuckDBResultReader } from './DuckDBResultReader';
import { DuckDBTypeId } from './DuckDBTypeId';
import { StatementType } from './enums';
import {
Expand Down Expand Up @@ -115,6 +116,19 @@ export class DuckDBPreparedStatement {
await duckdb.execute_prepared(this.prepared_statement)
);
}
public async runAndRead(): Promise<DuckDBResultReader> {
return new DuckDBResultReader(await this.run());
}
public async runAndReadAll(): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.run());
await reader.readAll();
return reader;
}
public async runAndReadUntil(targetRowCount: number): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.run());
await reader.readUntil(targetRowCount);
return reader;
}
public start(): DuckDBPendingResult {
return new DuckDBPendingResult(
duckdb.pending_prepared(this.prepared_statement)
Expand Down
43 changes: 43 additions & 0 deletions api/src/DuckDBResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import { DuckDBDataChunk } from './DuckDBDataChunk';
import { DuckDBLogicalType } from './DuckDBLogicalType';
import { DuckDBType } from './DuckDBType';
import { DuckDBTypeId } from './DuckDBTypeId';
import { DuckDBVector } from './DuckDBVector';
import { ResultReturnType, StatementType } from './enums';
import { DuckDBValue } from './values';

export class DuckDBResult {
private readonly result: duckdb.Result;
Expand Down Expand Up @@ -70,4 +72,45 @@ export class DuckDBResult {
chunks.push(chunk);
}
}
public async getColumns(): Promise<DuckDBValue[][]> {
const chunks = await this.fetchAllChunks();
if (chunks.length === 0) {
return [];
}
const firstChunk = chunks[0];
const columns: DuckDBValue[][] = [];
const columnCount = this.columnCount;
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
columns.push(firstChunk.getColumnValues(columnIndex));
}
for (let chunkIndex = 1; chunkIndex < chunks.length; chunkIndex++) {
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
const vector = chunks[chunkIndex].getColumnVector(columnIndex);
for (let itemIndex = 0; itemIndex < vector.itemCount; itemIndex++) {
columns[columnIndex].push(vector.getItem(itemIndex));
}
}
}
return columns;
}
public async getRows(): Promise<DuckDBValue[][]> {
const chunks = await this.fetchAllChunks();
const rows: DuckDBValue[][] = [];
for (const chunk of chunks) {
const chunkVectors: DuckDBVector[] = [];
const columnCount = chunk.columnCount;
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
chunkVectors.push(chunk.getColumnVector(columnIndex));
}
const rowCount = chunk.rowCount;
for (let rowIndex = 0; rowIndex < rowCount; rowIndex++) {
const row: DuckDBValue[] = [];
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
row.push(chunkVectors[columnIndex].getItem(rowIndex));
}
rows.push(row);
}
}
return rows;
}
}
196 changes: 196 additions & 0 deletions api/src/DuckDBResultReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import { DuckDBDataChunk } from './DuckDBDataChunk';
import { DuckDBLogicalType } from './DuckDBLogicalType';
import { DuckDBResult } from './DuckDBResult';
import { DuckDBType } from './DuckDBType';
import { DuckDBTypeId } from './DuckDBTypeId';
import { DuckDBVector } from './DuckDBVector';
import { ResultReturnType, StatementType } from './enums';
import { DuckDBValue } from './values';

interface ChunkSizeRun {
chunkCount: number;
chunkSize: number;
rowCount: number; // Equal to chunkCount * chunkSize; precalculated for efficiency.
}

export class DuckDBResultReader {
private readonly result: DuckDBResult;
private readonly chunks: DuckDBDataChunk[];
private readonly chunkSizeRuns: ChunkSizeRun[];
private currentRowCount_: number;
private done_: boolean;

constructor(result: DuckDBResult) {
this.result = result;
this.chunks = [];
this.chunkSizeRuns = [];
this.currentRowCount_ = 0;
this.done_ = false;
}

public get returnType(): ResultReturnType {
return this.result.returnType;
}
public get statementType(): StatementType {
return this.result.statementType;
}
public get columnCount(): number {
return this.result.columnCount;
}
public columnName(columnIndex: number): string {
return this.result.columnName(columnIndex);
}
public columnNames(): string[] {
return this.result.columnNames();
}
public columnTypeId(columnIndex: number): DuckDBTypeId {
return this.result.columnTypeId(columnIndex);
}
public columnLogicalType(columnIndex: number): DuckDBLogicalType {
return this.result.columnLogicalType(columnIndex);
}
public columnType(columnIndex: number): DuckDBType {
return this.result.columnType(columnIndex);
}
public columnTypes(): DuckDBType[] {
return this.result.columnTypes();
}
public get rowsChanged(): number {
return this.result.rowsChanged;
}

/** Total number of rows read so far. Call `readAll` or `readUntil` to read rows. */
public get currentRowCount() {
return this.currentRowCount_;
}

/** Whether reading is done, that is, there are no more rows to read. */
public get done() {
return this.done_;
}

/**
* Returns the value for the given column and row. Both are zero-indexed.
*
* Will return an error if `rowIndex` is greater than `currentRowCount`.
*/
public value(columnIndex: number, rowIndex: number): DuckDBValue {
if (this.currentRowCount_ === 0) {
throw Error(`No rows have been read`);
}
let chunkIndex = 0;
let currentRowIndex = rowIndex;
// Find which run of chunks our row is in.
// Since chunkSizeRuns shouldn't ever be longer than 2, this should be O(1).
for (const run of this.chunkSizeRuns) {
if (currentRowIndex < run.rowCount) {
// The row we're looking for is in this run.
// Calculate the chunk index and the row index in that chunk.
chunkIndex += Math.floor(currentRowIndex / run.chunkSize);
const rowIndexInChunk = currentRowIndex % run.chunkSize;
const chunk = this.chunks[chunkIndex];
return chunk.getColumnVector(columnIndex).getItem(rowIndexInChunk);
}
// The row we're looking for is not in this run.
// Update our counts for this run and move to the next one.
chunkIndex += run.chunkCount;
currentRowIndex -= run.rowCount;
}
// We didn't find our row. It must have been out of range.
throw Error(
`Row index ${rowIndex} requested, but only ${this.currentRowCount_} row have been read so far.`,
);
}

/** Read all rows. */
public async readAll(): Promise<void> {
return this.fetchChunks();
}

/**
* Read rows until at least the given target row count has been met.
*
* Note that the resulting row count could be greater than the target, since rows are read in chunks, typically of 2048 rows each.
*/
public async readUntil(targetRowCount: number): Promise<void> {
return this.fetchChunks(targetRowCount);
}

private async fetchChunks(targetRowCount?: number): Promise<void> {
while (
!(
this.done_ ||
(targetRowCount !== undefined && this.currentRowCount_ >= targetRowCount)
)
) {
const chunk = await this.result.fetchChunk();
if (chunk.rowCount > 0) {
this.updateChunkSizeRuns(chunk);
this.chunks.push(chunk);
this.currentRowCount_ += chunk.rowCount;
} else {
this.done_ = true;
}
}
}

private updateChunkSizeRuns(chunk: DuckDBDataChunk) {
if (this.chunkSizeRuns.length > 0) {
const lastRun = this.chunkSizeRuns[this.chunkSizeRuns.length - 1];
if (lastRun.chunkSize === chunk.rowCount) {
// If the new batch is the same size as the last one, just update our last run.
lastRun.chunkCount += 1;
lastRun.rowCount += lastRun.chunkSize;
return;
}
}
// If this is our first batch, or it's a different size, create a new run.
this.chunkSizeRuns.push({
chunkCount: 1,
chunkSize: chunk.rowCount,
rowCount: chunk.rowCount,
});
}

public getColumns(): DuckDBValue[][] {
if (this.chunks.length === 0) {
return [];
}
const firstChunk = this.chunks[0];
const columns: DuckDBValue[][] = [];
const columnCount = this.columnCount;
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
columns.push(firstChunk.getColumnValues(columnIndex));
}
for (let chunkIndex = 1; chunkIndex < this.chunks.length; chunkIndex++) {
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
const vector = this.chunks[chunkIndex].getColumnVector(columnIndex);
for (let itemIndex = 0; itemIndex < vector.itemCount; itemIndex++) {
columns[columnIndex].push(vector.getItem(itemIndex));
}
}
}
return columns;
}

public getRows(): DuckDBValue[][] {
const rows: DuckDBValue[][] = [];
for (const chunk of this.chunks) {
const chunkVectors: DuckDBVector[] = [];
const columnCount = chunk.columnCount;
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
chunkVectors.push(chunk.getColumnVector(columnIndex));
}
const rowCount = chunk.rowCount;
for (let rowIndex = 0; rowIndex < rowCount; rowIndex++) {
const row: DuckDBValue[] = [];
for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
row.push(chunkVectors[columnIndex].getItem(rowIndex));
}
rows.push(row);
}
}
return rows;
}

}
Loading