Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/multipart-parser/src/lib/multipart-request.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { MultipartParserOptions, MultipartPart } from './multipart.ts'
import type { MultipartParserOptions, MultipartPart, MultipartPartType } from './multipart.ts'
import { MultipartParseError, parseMultipartStream } from './multipart.ts'

/**
Expand Down Expand Up @@ -31,10 +31,10 @@ export function isMultipartRequest(request: Request): boolean {
* @param options Optional parser options, such as `maxHeaderSize` and `maxFileSize`
* @return An async generator yielding `MultipartPart` objects
*/
export async function* parseMultipartRequest(
export async function* parseMultipartRequest<S extends boolean | undefined>(
request: Request,
options?: MultipartParserOptions,
): AsyncGenerator<MultipartPart, void, unknown> {
options?: MultipartParserOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
if (!isMultipartRequest(request)) {
throw new MultipartParseError('Request is not a multipart request')
}
Expand Down
20 changes: 10 additions & 10 deletions packages/multipart-parser/src/lib/multipart.node.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type * as http from 'node:http'
import { Readable } from 'node:stream'

import type { ParseMultipartOptions, MultipartParserOptions, MultipartPart } from './multipart.ts'
import type { ParseMultipartOptions, MultipartParserOptions, MultipartPart, MultipartPartType } from './multipart.ts'
import {
MultipartParseError,
parseMultipart as parseMultipartWeb,
Expand All @@ -19,10 +19,10 @@ import { getMultipartBoundary } from './multipart-request.ts'
* @param options Options for the parser
* @return A generator yielding `MultipartPart` objects
*/
export function* parseMultipart(
export async function* parseMultipart<S extends boolean | undefined>(
message: Buffer | Iterable<Buffer>,
options: ParseMultipartOptions,
): Generator<MultipartPart, void, unknown> {
options: ParseMultipartOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
yield* parseMultipartWeb(message as Uint8Array | Iterable<Uint8Array>, options)
}

Expand All @@ -36,10 +36,10 @@ export function* parseMultipart(
* @param options Options for the parser
* @return An async generator yielding `MultipartPart` objects
*/
export async function* parseMultipartStream(
export async function* parseMultipartStream<S extends boolean | undefined>(
stream: Readable,
options: ParseMultipartOptions,
): AsyncGenerator<MultipartPart, void, unknown> {
options: ParseMultipartOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
yield* parseMultipartStreamWeb(Readable.toWeb(stream) as ReadableStream, options)
}

Expand All @@ -61,10 +61,10 @@ export function isMultipartRequest(req: http.IncomingMessage): boolean {
* @param options Options for the parser
* @return An async generator yielding `MultipartPart` objects
*/
export async function* parseMultipartRequest(
export async function* parseMultipartRequest<S extends boolean | undefined>(
req: http.IncomingMessage,
options?: MultipartParserOptions,
): AsyncGenerator<MultipartPart, void, unknown> {
options?: MultipartParserOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
if (!isMultipartRequest(req)) {
throw new MultipartParseError('Request is not a multipart request')
}
Expand Down
182 changes: 138 additions & 44 deletions packages/multipart-parser/src/lib/multipart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ export class MaxFileSizeExceededError extends MultipartParseError {
}
}

export interface ParseMultipartOptions {
export type MultipartPartType<S extends boolean | undefined> = S extends true ? MultipartPart : MultipartContentPart

export interface ParseMultipartOptions<S extends boolean | undefined> {
/**
* The boundary string used to separate parts in the multipart message,
* e.g. the `boundary` parameter in the `Content-Type` header.
Expand All @@ -54,6 +56,48 @@ export interface ParseMultipartOptions {
* Default: 2 MiB
*/
maxFileSize?: number

/**
* If `true`, the parser will only store the size of each part's content,
* and will not store the actual content in memory. You must set `onEmitBytes`
* to get the content of each part as it is received.
*
* This is useful for handling large file uploads without consuming
* large amounts of memory.
*
* If this is set to `true`, the `content`, `bytes`, and `text` properties of each part
* will be undefined, and only the `size` property will be available.
*
* Default: `false`
*/
onlyStreamContents?: S

/**
* A callback that is called each time a new part is created. This can be used to
* perform any setup or initialization for the part before any data is received.
*
* The part will contain the full header information, but the content will be empty.
*
* The callback will be awaited so you can return a promise to create backpressure.
*
* @param part The multipart part that was just created
* @returns Promise to be awaited before continuing parsing
*/
onCreatePart?(part: MultipartPartType<S>): Promise<void> | void

/**
* A callback that is called each time a chunk of bytes is received and parsed.
* This can be used to process the data as it is received, to stream
* it to disk or to a cloud storage service.
*
* The callback will be awaited so you can return a promise to create backpressure.
*
* @param part The multipart part being emitted
* @param chunk The chunk of data being emitted
* @returns Promise to be awaited before continuing parsing
*/
onEmitBytes?(part: MultipartPartType<S>, chunk: Uint8Array): Promise<void> | void

}

/**
Expand All @@ -66,11 +110,11 @@ export interface ParseMultipartOptions {
* @param options Options for the parser
* @return A generator that yields `MultipartPart` objects
*/
export function* parseMultipart(
export async function* parseMultipart<S extends boolean | undefined>(
message: Uint8Array | Iterable<Uint8Array>,
options: ParseMultipartOptions,
): Generator<MultipartPart, void, unknown> {
let parser = new MultipartParser(options.boundary, {
options: ParseMultipartOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
let parser = new MultipartParser<S>(options.boundary, {
maxHeaderSize: options.maxHeaderSize,
maxFileSize: options.maxFileSize,
})
Expand Down Expand Up @@ -100,11 +144,11 @@ export function* parseMultipart(
* @param options Options for the parser
* @return An async generator that yields `MultipartPart` objects
*/
export async function* parseMultipartStream(
export async function* parseMultipartStream<S extends boolean | undefined>(
stream: ReadableStream<Uint8Array>,
options: ParseMultipartOptions,
): AsyncGenerator<MultipartPart, void, unknown> {
let parser = new MultipartParser(options.boundary, {
options: ParseMultipartOptions<S>,
): AsyncGenerator<MultipartPartType<S>, void, unknown> {
let parser = new MultipartParser<S>(options.boundary, {
maxHeaderSize: options.maxHeaderSize,
maxFileSize: options.maxFileSize,
})
Expand All @@ -120,7 +164,7 @@ export async function* parseMultipartStream(
parser.finish()
}

export type MultipartParserOptions = Omit<ParseMultipartOptions, 'boundary'>
export type MultipartParserOptions<S extends boolean | undefined> = Omit<ParseMultipartOptions<S>, 'boundary'>

const MultipartParserStateStart = 0
const MultipartParserStateAfterBoundary = 1
Expand All @@ -136,7 +180,7 @@ const oneMb = 1024 * oneKb
/**
* A streaming parser for `multipart/*` HTTP messages.
*/
export class MultipartParser {
export class MultipartParser<S extends boolean | undefined> {
readonly boundary: string
readonly maxHeaderSize: number
readonly maxFileSize: number
Expand All @@ -149,10 +193,14 @@ export class MultipartParser {

#state = MultipartParserStateStart
#buffer: Uint8Array | null = null
#currentPart: MultipartPart | null = null
#currentPart: MultipartContentPart | MultipartPart | null = null
#contentLength = 0

constructor(boundary: string, options?: MultipartParserOptions) {
#onlyStreamContents: boolean
#onCreatePart?: (part: MultipartPart) => Promise<void> | void
#onEmitBytes?: (part: MultipartPart, chunk: Uint8Array) => Promise<void> | void

constructor(boundary: string, options?: MultipartParserOptions<any>) {
this.boundary = boundary
this.maxHeaderSize = options?.maxHeaderSize ?? 8 * oneKb
this.maxFileSize = options?.maxFileSize ?? 2 * oneMb
Expand All @@ -162,6 +210,10 @@ export class MultipartParser {
this.#findBoundary = createSearch(`\r\n--${boundary}`)
this.#findPartialTailBoundary = createPartialTailSearch(`\r\n--${boundary}`)
this.#boundaryLength = 4 + boundary.length // length of '\r\n--' + boundary

this.#onCreatePart = options?.onCreatePart
this.#onEmitBytes = options?.onEmitBytes
this.#onlyStreamContents = options?.onlyStreamContents ?? false
}

/**
Expand All @@ -170,7 +222,8 @@ export class MultipartParser {
* @param chunk A chunk of data to write to the parser
* @return A generator yielding `MultipartPart` objects as they are parsed
*/
*write(chunk: Uint8Array): Generator<MultipartPart, void, unknown> {
write(chunk: Uint8Array): AsyncGenerator<MultipartPartType<S>, void, unknown>;
async *write(chunk: Uint8Array): AsyncGenerator<MultipartPart | MultipartContentPart, void, unknown> {
if (this.#state === MultipartParserStateDone) {
throw new MultipartParseError('Unexpected data after end of stream')
}
Expand Down Expand Up @@ -201,16 +254,16 @@ export class MultipartParser {
let partialTailIndex = this.#findPartialTailBoundary(chunk)

if (partialTailIndex === -1) {
this.#append(index === 0 ? chunk : chunk.subarray(index))
await this.#append(index === 0 ? chunk : chunk.subarray(index))
} else {
this.#append(chunk.subarray(index, partialTailIndex))
await this.#append(chunk.subarray(index, partialTailIndex))
this.#buffer = chunk.subarray(partialTailIndex)
}

break
}

this.#append(chunk.subarray(index, boundaryIndex))
await this.#append(chunk.subarray(index, boundaryIndex))

yield this.#currentPart!

Expand Down Expand Up @@ -256,13 +309,19 @@ export class MultipartParser {
throw new MaxHeaderSizeExceededError(this.maxHeaderSize)
}

this.#currentPart = new MultipartPart(chunk.subarray(index, headerEndIndex), [])
const header = chunk.subarray(index, headerEndIndex)
this.#currentPart = this.#onlyStreamContents
? new MultipartPart(header)
: new MultipartContentPart(header, [])

this.#contentLength = 0

index = headerEndIndex + 4 // Skip header + \r\n\r\n

this.#state = MultipartParserStateBody

await this.#onCreatePart?.(this.#currentPart)

continue
}

Expand All @@ -283,12 +342,19 @@ export class MultipartParser {
}
}

#append(chunk: Uint8Array): void {
async #append(chunk: Uint8Array): Promise<void> {
if (this.#contentLength + chunk.length > this.maxFileSize) {
throw new MaxFileSizeExceededError(this.maxFileSize)
}

this.#currentPart!.content.push(chunk)
if (this.#currentPart!.hasContents()) {
this.#currentPart!.content.push(chunk)
} else {
this.#currentPart!.size += chunk.length
}

await this.#onEmitBytes?.(this.#currentPart!, chunk)

this.#contentLength += chunk.length
}

Expand All @@ -313,40 +379,23 @@ const decoder = new TextDecoder('utf-8', { fatal: true })
* A part of a `multipart/*` HTTP message.
*/
export class MultipartPart {
/**
* The raw content of this part as an array of `Uint8Array` chunks.
*/
readonly content: Uint8Array[]

#header: Uint8Array
#headers?: Headers
#size: number = 0

constructor(header: Uint8Array, content: Uint8Array[]) {
constructor(header: Uint8Array) {
this.#header = header
this.content = content
}

/**
* The content of this part as an `ArrayBuffer`.
* The size of the content emitted so far in bytes.
*/
get arrayBuffer(): ArrayBuffer {
return this.bytes.buffer as ArrayBuffer
get size(): number {
return this.#size
}

/**
* The content of this part as a single `Uint8Array`. In `multipart/form-data` messages, this is useful
* for reading the value of files that were uploaded using `<input type="file">` fields.
*/
get bytes(): Uint8Array {
let buffer = new Uint8Array(this.size)

let offset = 0
for (let chunk of this.content) {
buffer.set(chunk, offset)
offset += chunk.length
}

return buffer
set size(value: number) {
this.#size = value
}

/**
Expand Down Expand Up @@ -395,6 +444,47 @@ export class MultipartPart {
return this.headers.contentDisposition.name
}

hasContents(): this is MultipartContentPart {
return false
}

}

export class MultipartContentPart extends MultipartPart {

/**
* The raw content of this part as an array of `Uint8Array` chunks.
*/
readonly content: Uint8Array[]

constructor(header: Uint8Array, content: Uint8Array[]) {
super(header);
this.content = content
}

/**
* The content of this part as an `ArrayBuffer`.
*/
get arrayBuffer(): ArrayBuffer {
return this.bytes.buffer as ArrayBuffer
}

/**
* The content of this part as a single `Uint8Array`. In `multipart/form-data` messages, this is useful
* for reading the value of files that were uploaded using `<input type="file">` fields.
*/
get bytes(): Uint8Array {
let buffer = new Uint8Array(this.size)

let offset = 0
for (let chunk of this.content) {
buffer.set(chunk, offset)
offset += chunk.length
}

return buffer
}

/**
* The size of the content in bytes.
*/
Expand All @@ -417,4 +507,8 @@ export class MultipartPart {
get text(): string {
return decoder.decode(this.bytes)
}

hasContents(): this is MultipartContentPart {
return true
}
}