Skip to content
Merged
67 changes: 67 additions & 0 deletions api/src/DuckDBResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,71 @@ export class DuckDBResult {
public async getRowObjectsJson(): Promise<Record<string, Json>[]> {
return this.convertRowObjects(JsonDuckDBValueConverter);
}

public async *[Symbol.asyncIterator](): AsyncIterableIterator<DuckDBDataChunk> {
while (true) {
const chunk = await this.fetchChunk();
if (chunk && chunk.rowCount > 0) {
yield chunk;
} else {
break;
}
}
}

public async *yieldRows(): AsyncIterableIterator<DuckDBValue[][]> {
for await (const chunk of this) {
yield getRowsFromChunks([chunk]);
}
}

public async *yieldRowObjects(): AsyncIterableIterator<
Record<string, DuckDBValue>[]
> {
const deduplicatedColumnNames = this.deduplicatedColumnNames();
for await (const chunk of this) {
yield getRowObjectsFromChunks([chunk], deduplicatedColumnNames);
}
}

public async *yieldConvertedRows<T>(
converter: DuckDBValueConverter<T>
): AsyncIterableIterator<(T | null)[][]> {
for await (const chunk of this) {
yield convertRowsFromChunks([chunk], converter);
}
}

public async *yieldConvertedRowObjects<T>(
converter: DuckDBValueConverter<T>
): AsyncIterableIterator<Record<string, T | null>[]> {
const deduplicatedColumnNames = this.deduplicatedColumnNames();
for await (const chunk of this) {
yield convertRowObjectsFromChunks(
[chunk],
deduplicatedColumnNames,
converter,
);
}
}

public yieldRowsJs(): AsyncIterableIterator<JS[][]> {
return this.yieldConvertedRows(JSDuckDBValueConverter);
}

public yieldRowsJson(): AsyncIterableIterator<Json[][]> {
return this.yieldConvertedRows(JsonDuckDBValueConverter);
}

public yieldRowObjectJs(): AsyncIterableIterator<
Record<string, JS>[]
> {
return this.yieldConvertedRowObjects(JSDuckDBValueConverter);
}

public yieldRowObjectJson(): AsyncIterableIterator<
Record<string, Json>[]
> {
return this.yieldConvertedRowObjects(JsonDuckDBValueConverter);
}
}
125 changes: 125 additions & 0 deletions api/test/api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2445,4 +2445,129 @@ ORDER BY name
assert.equal(quotedIdentifier('table name'), '"table name"');
});
});

test("iterate over DuckDBResult stream in chunks", async () => {
await withConnection(async (connection) => {
const result = await connection.stream(
"select i::int, i::int + 10, (i + 100)::varchar from range(3) t(i)",
);

for await (const chunk of result) {
assert.strictEqual(chunk.rowCount, 3);
let i = 0;
assertValues<number, DuckDBIntegerVector>(
chunk,
i++,
DuckDBIntegerVector,
[0, 1, 2],
);
assertValues<number, DuckDBIntegerVector>(
chunk,
i++,
DuckDBIntegerVector,
[10, 11, 12],
);
assertValues<string, DuckDBVarCharVector>(
chunk,
i++,
DuckDBVarCharVector,
["100", "101", "102"],
);
}
});
});

test("iterate over many DuckDBResult chunks", async () => {
await withConnection(async (connection) => {
const chunkSize = 2048;
const totalExpectedCount = chunkSize * 3;
const result = await connection.stream(
`select i::int from range(${totalExpectedCount}) t(i)`
);

let total = 0;
for await (const chunk of result) {
assert.equal(chunk.rowCount, chunkSize);
total += chunk.rowCount;
}

assert.equal(total, totalExpectedCount);
});
});

test("iterate stream of rows", async () => {
await withConnection(async (connection) => {
const result = await connection.stream(
"select i::int, i::int + 10, (i + 100)::varchar from range(3) t(i)",
);

const expectedRows: DuckDBValue[][] = [
[0, 10, '100'],
[1, 11, '101'],
[2, 12, '102']
];

for await (const rows of result.yieldRows()) {
for (let i = 0; i < rows.length; i++) {
assert.deepEqual(rows[i], expectedRows[i]);
}
}
});
});

test("iterate stream of row objects", async () => {
await withConnection(async (connection) => {
const result = await connection.stream(
"select i::int as a, i::int + 10 as b, (i + 100)::varchar as c from range(3) t(i)",
);

const expectedRows: Record<string, DuckDBValue>[] = [
{ a: 0, b: 10, c: '100'},
{ a: 1, b: 11, c: '101'},
{ a: 2, b: 12, c: '102'}
];

for await (const rows of result.yieldRowObjects()) {
for (let i = 0; i < rows.length; i++) {
assert.deepEqual(rows[i], expectedRows[i]);
}
}
});
});

test("iterate result stream rows js", async () => {
await withConnection(async (connection) => {
const result = await connection.stream(createTestJSQuery());
for await (const row of result.yieldRowsJs()) {
assert.deepEqual(row, createTestJSRowsJS());
}
});
});

test("iterate result stream object js", async () => {
await withConnection(async (connection) => {
const result = await connection.stream(createTestJSQuery());
for await (const row of result.yieldRowObjectJs()) {
assert.deepEqual(row, createTestJSRowObjectsJS());
}
});
});

test("iterate result stream rows json", async () => {
await withConnection(async (connection) => {
const result = await connection.stream(`from test_all_types()`);
for await (const row of result.yieldRowsJson()) {
assert.deepEqual(row, createTestAllTypesRowsJson());
}
});
});

test("iterate result stream object json", async () => {
await withConnection(async (connection) => {
const result = await connection.stream(`from test_all_types()`);
for await (const row of result.yieldRowObjectJson()) {
assert.deepEqual(row, createTestAllTypesRowObjectsJson());
}
});
});
});
Loading