diff --git a/packages/multipart-parser/src/lib/multipart-request.ts b/packages/multipart-parser/src/lib/multipart-request.ts index d69ec38c754..7ba8bea4472 100644 --- a/packages/multipart-parser/src/lib/multipart-request.ts +++ b/packages/multipart-parser/src/lib/multipart-request.ts @@ -1,4 +1,4 @@ -import type { MultipartParserOptions, MultipartPart } from './multipart.ts' +import type { MultipartParserOptions, MultipartPart, MultipartPartType } from './multipart.ts' import { MultipartParseError, parseMultipartStream } from './multipart.ts' /** @@ -31,10 +31,10 @@ export function isMultipartRequest(request: Request): boolean { * @param options Optional parser options, such as `maxHeaderSize` and `maxFileSize` * @return An async generator yielding `MultipartPart` objects */ -export async function* parseMultipartRequest( +export async function* parseMultipartRequest( request: Request, - options?: MultipartParserOptions, -): AsyncGenerator { + options?: MultipartParserOptions, +): AsyncGenerator, void, unknown> { if (!isMultipartRequest(request)) { throw new MultipartParseError('Request is not a multipart request') } diff --git a/packages/multipart-parser/src/lib/multipart.node.ts b/packages/multipart-parser/src/lib/multipart.node.ts index a90b6c65396..a62ebd28ac2 100644 --- a/packages/multipart-parser/src/lib/multipart.node.ts +++ b/packages/multipart-parser/src/lib/multipart.node.ts @@ -1,7 +1,7 @@ import type * as http from 'node:http' import { Readable } from 'node:stream' -import type { ParseMultipartOptions, MultipartParserOptions, MultipartPart } from './multipart.ts' +import type { ParseMultipartOptions, MultipartParserOptions, MultipartPart, MultipartPartType } from './multipart.ts' import { MultipartParseError, parseMultipart as parseMultipartWeb, @@ -19,10 +19,10 @@ import { getMultipartBoundary } from './multipart-request.ts' * @param options Options for the parser * @return A generator yielding `MultipartPart` objects */ -export function* parseMultipart( +export async function* parseMultipart( message: Buffer | Iterable, - options: ParseMultipartOptions, -): Generator { + options: ParseMultipartOptions, +): AsyncGenerator, void, unknown> { yield* parseMultipartWeb(message as Uint8Array | Iterable, options) } @@ -36,10 +36,10 @@ export function* parseMultipart( * @param options Options for the parser * @return An async generator yielding `MultipartPart` objects */ -export async function* parseMultipartStream( +export async function* parseMultipartStream( stream: Readable, - options: ParseMultipartOptions, -): AsyncGenerator { + options: ParseMultipartOptions, +): AsyncGenerator, void, unknown> { yield* parseMultipartStreamWeb(Readable.toWeb(stream) as ReadableStream, options) } @@ -61,10 +61,10 @@ export function isMultipartRequest(req: http.IncomingMessage): boolean { * @param options Options for the parser * @return An async generator yielding `MultipartPart` objects */ -export async function* parseMultipartRequest( +export async function* parseMultipartRequest( req: http.IncomingMessage, - options?: MultipartParserOptions, -): AsyncGenerator { + options?: MultipartParserOptions, +): AsyncGenerator, void, unknown> { if (!isMultipartRequest(req)) { throw new MultipartParseError('Request is not a multipart request') } diff --git a/packages/multipart-parser/src/lib/multipart.ts b/packages/multipart-parser/src/lib/multipart.ts index ec3b4b5ec5a..65906b0d6a0 100644 --- a/packages/multipart-parser/src/lib/multipart.ts +++ b/packages/multipart-parser/src/lib/multipart.ts @@ -34,7 +34,9 @@ export class MaxFileSizeExceededError extends MultipartParseError { } } -export interface ParseMultipartOptions { +export type MultipartPartType = S extends true ? MultipartPart : MultipartContentPart + +export interface ParseMultipartOptions { /** * The boundary string used to separate parts in the multipart message, * e.g. the `boundary` parameter in the `Content-Type` header. @@ -54,6 +56,48 @@ export interface ParseMultipartOptions { * Default: 2 MiB */ maxFileSize?: number + + /** + * If `true`, the parser will only store the size of each part's content, + * and will not store the actual content in memory. You must set `onEmitBytes` + * to get the content of each part as it is received. + * + * This is useful for handling large file uploads without consuming + * large amounts of memory. + * + * If this is set to `true`, the `content`, `bytes`, and `text` properties of each part + * will be undefined, and only the `size` property will be available. + * + * Default: `false` + */ + onlyStreamContents?: S + + /** + * A callback that is called each time a new part is created. This can be used to + * perform any setup or initialization for the part before any data is received. + * + * The part will contain the full header information, but the content will be empty. + * + * The callback will be awaited so you can return a promise to create backpressure. + * + * @param part The multipart part that was just created + * @returns Promise to be awaited before continuing parsing + */ + onCreatePart?(part: MultipartPartType): Promise | void + + /** + * A callback that is called each time a chunk of bytes is received and parsed. + * This can be used to process the data as it is received, to stream + * it to disk or to a cloud storage service. + * + * The callback will be awaited so you can return a promise to create backpressure. + * + * @param part The multipart part being emitted + * @param chunk The chunk of data being emitted + * @returns Promise to be awaited before continuing parsing + */ + onEmitBytes?(part: MultipartPartType, chunk: Uint8Array): Promise | void + } /** @@ -66,11 +110,11 @@ export interface ParseMultipartOptions { * @param options Options for the parser * @return A generator that yields `MultipartPart` objects */ -export function* parseMultipart( +export async function* parseMultipart( message: Uint8Array | Iterable, - options: ParseMultipartOptions, -): Generator { - let parser = new MultipartParser(options.boundary, { + options: ParseMultipartOptions, +): AsyncGenerator, void, unknown> { + let parser = new MultipartParser(options.boundary, { maxHeaderSize: options.maxHeaderSize, maxFileSize: options.maxFileSize, }) @@ -100,11 +144,11 @@ export function* parseMultipart( * @param options Options for the parser * @return An async generator that yields `MultipartPart` objects */ -export async function* parseMultipartStream( +export async function* parseMultipartStream( stream: ReadableStream, - options: ParseMultipartOptions, -): AsyncGenerator { - let parser = new MultipartParser(options.boundary, { + options: ParseMultipartOptions, +): AsyncGenerator, void, unknown> { + let parser = new MultipartParser(options.boundary, { maxHeaderSize: options.maxHeaderSize, maxFileSize: options.maxFileSize, }) @@ -120,7 +164,7 @@ export async function* parseMultipartStream( parser.finish() } -export type MultipartParserOptions = Omit +export type MultipartParserOptions = Omit, 'boundary'> const MultipartParserStateStart = 0 const MultipartParserStateAfterBoundary = 1 @@ -136,7 +180,7 @@ const oneMb = 1024 * oneKb /** * A streaming parser for `multipart/*` HTTP messages. */ -export class MultipartParser { +export class MultipartParser { readonly boundary: string readonly maxHeaderSize: number readonly maxFileSize: number @@ -149,10 +193,14 @@ export class MultipartParser { #state = MultipartParserStateStart #buffer: Uint8Array | null = null - #currentPart: MultipartPart | null = null + #currentPart: MultipartContentPart | MultipartPart | null = null #contentLength = 0 - constructor(boundary: string, options?: MultipartParserOptions) { + #onlyStreamContents: boolean + #onCreatePart?: (part: MultipartPart) => Promise | void + #onEmitBytes?: (part: MultipartPart, chunk: Uint8Array) => Promise | void + + constructor(boundary: string, options?: MultipartParserOptions) { this.boundary = boundary this.maxHeaderSize = options?.maxHeaderSize ?? 8 * oneKb this.maxFileSize = options?.maxFileSize ?? 2 * oneMb @@ -162,6 +210,10 @@ export class MultipartParser { this.#findBoundary = createSearch(`\r\n--${boundary}`) this.#findPartialTailBoundary = createPartialTailSearch(`\r\n--${boundary}`) this.#boundaryLength = 4 + boundary.length // length of '\r\n--' + boundary + + this.#onCreatePart = options?.onCreatePart + this.#onEmitBytes = options?.onEmitBytes + this.#onlyStreamContents = options?.onlyStreamContents ?? false } /** @@ -170,7 +222,8 @@ export class MultipartParser { * @param chunk A chunk of data to write to the parser * @return A generator yielding `MultipartPart` objects as they are parsed */ - *write(chunk: Uint8Array): Generator { + write(chunk: Uint8Array): AsyncGenerator, void, unknown>; + async *write(chunk: Uint8Array): AsyncGenerator { if (this.#state === MultipartParserStateDone) { throw new MultipartParseError('Unexpected data after end of stream') } @@ -201,16 +254,16 @@ export class MultipartParser { let partialTailIndex = this.#findPartialTailBoundary(chunk) if (partialTailIndex === -1) { - this.#append(index === 0 ? chunk : chunk.subarray(index)) + await this.#append(index === 0 ? chunk : chunk.subarray(index)) } else { - this.#append(chunk.subarray(index, partialTailIndex)) + await this.#append(chunk.subarray(index, partialTailIndex)) this.#buffer = chunk.subarray(partialTailIndex) } break } - this.#append(chunk.subarray(index, boundaryIndex)) + await this.#append(chunk.subarray(index, boundaryIndex)) yield this.#currentPart! @@ -256,13 +309,19 @@ export class MultipartParser { throw new MaxHeaderSizeExceededError(this.maxHeaderSize) } - this.#currentPart = new MultipartPart(chunk.subarray(index, headerEndIndex), []) + const header = chunk.subarray(index, headerEndIndex) + this.#currentPart = this.#onlyStreamContents + ? new MultipartPart(header) + : new MultipartContentPart(header, []) + this.#contentLength = 0 index = headerEndIndex + 4 // Skip header + \r\n\r\n this.#state = MultipartParserStateBody + await this.#onCreatePart?.(this.#currentPart) + continue } @@ -283,12 +342,19 @@ export class MultipartParser { } } - #append(chunk: Uint8Array): void { + async #append(chunk: Uint8Array): Promise { if (this.#contentLength + chunk.length > this.maxFileSize) { throw new MaxFileSizeExceededError(this.maxFileSize) } - this.#currentPart!.content.push(chunk) + if (this.#currentPart!.hasContents()) { + this.#currentPart!.content.push(chunk) + } else { + this.#currentPart!.size += chunk.length + } + + await this.#onEmitBytes?.(this.#currentPart!, chunk) + this.#contentLength += chunk.length } @@ -313,40 +379,23 @@ const decoder = new TextDecoder('utf-8', { fatal: true }) * A part of a `multipart/*` HTTP message. */ export class MultipartPart { - /** - * The raw content of this part as an array of `Uint8Array` chunks. - */ - readonly content: Uint8Array[] #header: Uint8Array #headers?: Headers + #size: number = 0 - constructor(header: Uint8Array, content: Uint8Array[]) { + constructor(header: Uint8Array) { this.#header = header - this.content = content } /** - * The content of this part as an `ArrayBuffer`. + * The size of the content emitted so far in bytes. */ - get arrayBuffer(): ArrayBuffer { - return this.bytes.buffer as ArrayBuffer + get size(): number { + return this.#size } - - /** - * The content of this part as a single `Uint8Array`. In `multipart/form-data` messages, this is useful - * for reading the value of files that were uploaded using `` fields. - */ - get bytes(): Uint8Array { - let buffer = new Uint8Array(this.size) - - let offset = 0 - for (let chunk of this.content) { - buffer.set(chunk, offset) - offset += chunk.length - } - - return buffer + set size(value: number) { + this.#size = value } /** @@ -395,6 +444,47 @@ export class MultipartPart { return this.headers.contentDisposition.name } + hasContents(): this is MultipartContentPart { + return false + } + +} + +export class MultipartContentPart extends MultipartPart { + + /** + * The raw content of this part as an array of `Uint8Array` chunks. + */ + readonly content: Uint8Array[] + + constructor(header: Uint8Array, content: Uint8Array[]) { + super(header); + this.content = content + } + + /** + * The content of this part as an `ArrayBuffer`. + */ + get arrayBuffer(): ArrayBuffer { + return this.bytes.buffer as ArrayBuffer + } + + /** + * The content of this part as a single `Uint8Array`. In `multipart/form-data` messages, this is useful + * for reading the value of files that were uploaded using `` fields. + */ + get bytes(): Uint8Array { + let buffer = new Uint8Array(this.size) + + let offset = 0 + for (let chunk of this.content) { + buffer.set(chunk, offset) + offset += chunk.length + } + + return buffer + } + /** * The size of the content in bytes. */ @@ -417,4 +507,8 @@ export class MultipartPart { get text(): string { return decoder.decode(this.bytes) } + + hasContents(): this is MultipartContentPart { + return true + } }