Skip to content

Commit be6f528

Browse files
authored
Merge pull request #303 from rstefanic/feature/stream
DuckDBRowReader: Add class to stream DuckDB results efficiently
2 parents 7e0fc2c + feadcbe commit be6f528

File tree

2 files changed

+192
-0
lines changed

2 files changed

+192
-0
lines changed

api/src/DuckDBResult.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,71 @@ export class DuckDBResult {
239239
public async getRowObjectsJson(): Promise<Record<string, Json>[]> {
240240
return this.convertRowObjects(JsonDuckDBValueConverter);
241241
}
242+
243+
public async *[Symbol.asyncIterator](): AsyncIterableIterator<DuckDBDataChunk> {
244+
while (true) {
245+
const chunk = await this.fetchChunk();
246+
if (chunk && chunk.rowCount > 0) {
247+
yield chunk;
248+
} else {
249+
break;
250+
}
251+
}
252+
}
253+
254+
public async *yieldRows(): AsyncIterableIterator<DuckDBValue[][]> {
255+
for await (const chunk of this) {
256+
yield getRowsFromChunks([chunk]);
257+
}
258+
}
259+
260+
public async *yieldRowObjects(): AsyncIterableIterator<
261+
Record<string, DuckDBValue>[]
262+
> {
263+
const deduplicatedColumnNames = this.deduplicatedColumnNames();
264+
for await (const chunk of this) {
265+
yield getRowObjectsFromChunks([chunk], deduplicatedColumnNames);
266+
}
267+
}
268+
269+
public async *yieldConvertedRows<T>(
270+
converter: DuckDBValueConverter<T>
271+
): AsyncIterableIterator<(T | null)[][]> {
272+
for await (const chunk of this) {
273+
yield convertRowsFromChunks([chunk], converter);
274+
}
275+
}
276+
277+
public async *yieldConvertedRowObjects<T>(
278+
converter: DuckDBValueConverter<T>
279+
): AsyncIterableIterator<Record<string, T | null>[]> {
280+
const deduplicatedColumnNames = this.deduplicatedColumnNames();
281+
for await (const chunk of this) {
282+
yield convertRowObjectsFromChunks(
283+
[chunk],
284+
deduplicatedColumnNames,
285+
converter,
286+
);
287+
}
288+
}
289+
290+
public yieldRowsJs(): AsyncIterableIterator<JS[][]> {
291+
return this.yieldConvertedRows(JSDuckDBValueConverter);
292+
}
293+
294+
public yieldRowsJson(): AsyncIterableIterator<Json[][]> {
295+
return this.yieldConvertedRows(JsonDuckDBValueConverter);
296+
}
297+
298+
public yieldRowObjectJs(): AsyncIterableIterator<
299+
Record<string, JS>[]
300+
> {
301+
return this.yieldConvertedRowObjects(JSDuckDBValueConverter);
302+
}
303+
304+
public yieldRowObjectJson(): AsyncIterableIterator<
305+
Record<string, Json>[]
306+
> {
307+
return this.yieldConvertedRowObjects(JsonDuckDBValueConverter);
308+
}
242309
}

api/test/api.test.ts

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2445,4 +2445,129 @@ ORDER BY name
24452445
assert.equal(quotedIdentifier('table name'), '"table name"');
24462446
});
24472447
});
2448+
2449+
test("iterate over DuckDBResult stream in chunks", async () => {
2450+
await withConnection(async (connection) => {
2451+
const result = await connection.stream(
2452+
"select i::int, i::int + 10, (i + 100)::varchar from range(3) t(i)",
2453+
);
2454+
2455+
for await (const chunk of result) {
2456+
assert.strictEqual(chunk.rowCount, 3);
2457+
let i = 0;
2458+
assertValues<number, DuckDBIntegerVector>(
2459+
chunk,
2460+
i++,
2461+
DuckDBIntegerVector,
2462+
[0, 1, 2],
2463+
);
2464+
assertValues<number, DuckDBIntegerVector>(
2465+
chunk,
2466+
i++,
2467+
DuckDBIntegerVector,
2468+
[10, 11, 12],
2469+
);
2470+
assertValues<string, DuckDBVarCharVector>(
2471+
chunk,
2472+
i++,
2473+
DuckDBVarCharVector,
2474+
["100", "101", "102"],
2475+
);
2476+
}
2477+
});
2478+
});
2479+
2480+
test("iterate over many DuckDBResult chunks", async () => {
2481+
await withConnection(async (connection) => {
2482+
const chunkSize = 2048;
2483+
const totalExpectedCount = chunkSize * 3;
2484+
const result = await connection.stream(
2485+
`select i::int from range(${totalExpectedCount}) t(i)`
2486+
);
2487+
2488+
let total = 0;
2489+
for await (const chunk of result) {
2490+
assert.equal(chunk.rowCount, chunkSize);
2491+
total += chunk.rowCount;
2492+
}
2493+
2494+
assert.equal(total, totalExpectedCount);
2495+
});
2496+
});
2497+
2498+
test("iterate stream of rows", async () => {
2499+
await withConnection(async (connection) => {
2500+
const result = await connection.stream(
2501+
"select i::int, i::int + 10, (i + 100)::varchar from range(3) t(i)",
2502+
);
2503+
2504+
const expectedRows: DuckDBValue[][] = [
2505+
[0, 10, '100'],
2506+
[1, 11, '101'],
2507+
[2, 12, '102']
2508+
];
2509+
2510+
for await (const rows of result.yieldRows()) {
2511+
for (let i = 0; i < rows.length; i++) {
2512+
assert.deepEqual(rows[i], expectedRows[i]);
2513+
}
2514+
}
2515+
});
2516+
});
2517+
2518+
test("iterate stream of row objects", async () => {
2519+
await withConnection(async (connection) => {
2520+
const result = await connection.stream(
2521+
"select i::int as a, i::int + 10 as b, (i + 100)::varchar as c from range(3) t(i)",
2522+
);
2523+
2524+
const expectedRows: Record<string, DuckDBValue>[] = [
2525+
{ a: 0, b: 10, c: '100'},
2526+
{ a: 1, b: 11, c: '101'},
2527+
{ a: 2, b: 12, c: '102'}
2528+
];
2529+
2530+
for await (const rows of result.yieldRowObjects()) {
2531+
for (let i = 0; i < rows.length; i++) {
2532+
assert.deepEqual(rows[i], expectedRows[i]);
2533+
}
2534+
}
2535+
});
2536+
});
2537+
2538+
test("iterate result stream rows js", async () => {
2539+
await withConnection(async (connection) => {
2540+
const result = await connection.stream(createTestJSQuery());
2541+
for await (const row of result.yieldRowsJs()) {
2542+
assert.deepEqual(row, createTestJSRowsJS());
2543+
}
2544+
});
2545+
});
2546+
2547+
test("iterate result stream object js", async () => {
2548+
await withConnection(async (connection) => {
2549+
const result = await connection.stream(createTestJSQuery());
2550+
for await (const row of result.yieldRowObjectJs()) {
2551+
assert.deepEqual(row, createTestJSRowObjectsJS());
2552+
}
2553+
});
2554+
});
2555+
2556+
test("iterate result stream rows json", async () => {
2557+
await withConnection(async (connection) => {
2558+
const result = await connection.stream(`from test_all_types()`);
2559+
for await (const row of result.yieldRowsJson()) {
2560+
assert.deepEqual(row, createTestAllTypesRowsJson());
2561+
}
2562+
});
2563+
});
2564+
2565+
test("iterate result stream object json", async () => {
2566+
await withConnection(async (connection) => {
2567+
const result = await connection.stream(`from test_all_types()`);
2568+
for await (const row of result.yieldRowObjectJson()) {
2569+
assert.deepEqual(row, createTestAllTypesRowObjectsJson());
2570+
}
2571+
});
2572+
});
24482573
});

0 commit comments

Comments
 (0)