Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified bun.lockb
Binary file not shown.
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@
"typescript": "^5.4.5",
"vite": "^5.3.3"
},
"dependencies": {
"eventsource-parser": "^1.1.2"
},
"prettier": {
"semi": false,
"tabWidth": 4,
Expand Down
71 changes: 56 additions & 15 deletions src/treaty2/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
/* eslint-disable no-case-declarations */
/* eslint-disable prefer-const */
import type { Elysia } from 'elysia'
import { EventSourceParserStream } from 'eventsource-parser/stream'

import type { Treaty } from './types'

import { EdenFetchError } from '../errors'
Expand Down Expand Up @@ -112,25 +114,64 @@ const processHeaders = (
}
}

export async function* streamResponse(response: Response) {
const body = response.body
class TextDecoderStream extends TransformStream<Uint8Array, string> {
constructor() {
const decoder = new TextDecoder('utf-8', {
ignoreBOM: true
})
super({
transform(
chunk: Uint8Array,
controller: TransformStreamDefaultController<string>
) {
const decoded = decoder.decode(chunk, { stream: true })
if (decoded.length > 0) {
controller.enqueue(decoded)
}
},
flush(controller: TransformStreamDefaultController<string>) {
const output = decoder.decode()
if (output.length > 0) {
controller.enqueue(output)
}
}
})
}
}

if (!body) return
export async function* streamResponse(
response: Response
): AsyncGenerator {
const body = response.body
if (!body) return

const eventStream = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())

let reader = eventStream.getReader()
try {
while (true) {
const { done, value: event } = await reader.read()
if (done) break
if (event?.event === 'error') {
throw new EdenFetchError(500, event.data)
}
if (event) {
yield tryParsingJson(event.data)
}
}
} finally {
reader.releaseLock()
}
}

const reader = body.getReader()
const decoder = new TextDecoder()

function tryParsingJson(data: string): any {
try {
while (true) {
const { done, value } = await reader.read()
if (done) break

const data = decoder.decode(value)

yield parseStringifiedValue(data)
}
} finally {
reader.releaseLock()
return JSON.parse(data)
} catch (error) {
return null
}
}

Expand Down