diff --git a/src/treaty2/index.ts b/src/treaty2/index.ts index 74eaa90..ad3f2f2 100644 --- a/src/treaty2/index.ts +++ b/src/treaty2/index.ts @@ -112,28 +112,75 @@ const processHeaders = ( } } -export async function* streamResponse(response: Response) { - const body = response.body - - if (!body) return - - const reader = body.getReader() - const decoder = new TextDecoder() - - try { - while (true) { - const { done, value } = await reader.read() - if (done) break +interface SSEEvent { + event: string; + data: string; + id?: string; +} - const data = decoder.decode(value) +export async function* streamSSEResponse(response: Response): AsyncGenerator { + const body = response.body; + if (!body) return; + + const reader = body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + + let eventEnd: number; + while ((eventEnd = buffer.indexOf('\n\n')) >= 0) { + const eventData = buffer.slice(0, eventEnd); + buffer = buffer.slice(eventEnd + 2); + + const event = parseEvent(eventData); + if (event) { + yield event; + } + } + } + + if (buffer.trim()) { + const event = parseEvent(buffer); + if (event) { + yield event; + } + } + } finally { + reader.releaseLock(); + } +} - yield parseStringifiedValue(data) - } - } finally { - reader.releaseLock() - } +function parseEvent(eventData: string): SSEEvent | null { + let event: string = 'message'; + let data: string[] = []; + let id: string | undefined; + + const lines = eventData.split('\n'); + for (const line of lines) { + if (line.startsWith('event:')) { + event = line.slice(6).trim(); + } else if (line.startsWith('data:')) { + data.push(line.slice(5)); + } else if (line.startsWith('id:')) { + id = line.slice(3).trim(); + } + } + + if (data.length > 0) { + const dataString = data.join('\n'); + return { event, data: dataString, id }; + } + + return null; } + const createProxy = ( domain: string, config: Treaty.Config, @@ -407,7 +454,7 @@ const createProxy = ( response.headers.get('Content-Type')?.split(';')[0] ) { case 'text/event-stream': - data = streamResponse(response) + data = streamSSEResponse(response) break case 'application/json':