diff --git a/bun.lockb b/bun.lockb index 50d67c8..7af5776 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/package.json b/package.json index 86069b3..f23d661 100644 --- a/package.json +++ b/package.json @@ -76,6 +76,9 @@ "typescript": "^5.4.5", "vite": "^5.3.3" }, + "dependencies": { + "eventsource-parser": "^1.1.2" + }, "prettier": { "semi": false, "tabWidth": 4, diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index 74eaa90..7af212c 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -2,6 +2,8 @@ /* eslint-disable no-case-declarations */ /* eslint-disable prefer-const */ import type { Elysia } from 'elysia' +import { EventSourceParserStream } from 'eventsource-parser/stream' + import type { Treaty } from './types' import { EdenFetchError } from '../errors' @@ -112,25 +114,64 @@ const processHeaders = ( } } -export async function* streamResponse(response: Response) { - const body = response.body +class TextDecoderStream extends TransformStream { + constructor() { + const decoder = new TextDecoder('utf-8', { + ignoreBOM: true + }) + super({ + transform( + chunk: Uint8Array, + controller: TransformStreamDefaultController + ) { + const decoded = decoder.decode(chunk, { stream: true }) + if (decoded.length > 0) { + controller.enqueue(decoded) + } + }, + flush(controller: TransformStreamDefaultController) { + const output = decoder.decode() + if (output.length > 0) { + controller.enqueue(output) + } + } + }) + } +} - if (!body) return +export async function* streamResponse( + response: Response +): AsyncGenerator { + const body = response.body + if (!body) return + + const eventStream = response.body + .pipeThrough(new TextDecoderStream()) + .pipeThrough(new EventSourceParserStream()) + + let reader = eventStream.getReader() + try { + while (true) { + const { done, value: event } = await reader.read() + if (done) break + if (event?.event === 'error') { + throw new EdenFetchError(500, event.data) + } + if (event) { + yield tryParsingJson(event.data) + } + } + } finally { + reader.releaseLock() + } +} - const reader = body.getReader() - const decoder = new TextDecoder() +function tryParsingJson(data: string): any { try { - while (true) { - const { done, value } = await reader.read() - if (done) break - - const data = decoder.decode(value) - - yield parseStringifiedValue(data) - } - } finally { - reader.releaseLock() + return JSON.parse(data) + } catch (error) { + return null } }