Skip to content

Commit c57a572

Browse files
committed
chore: fixing race
Signed-off-by: Danny Kopping <[email protected]>
1 parent 7e07b1d commit c57a572

File tree

3 files changed

+8
-4
lines changed

3 files changed

+8
-4
lines changed

intercept_anthropic_messages_streaming.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ newStream:
414414
prompt = nil
415415
}
416416

417-
if events.hasInitiated() {
417+
if events.isStreaming() {
418418
// Check if the stream encountered any errors.
419419
if streamErr := stream.Err(); streamErr != nil {
420420
if isUnrecoverableError(streamErr) {

intercept_openai_chat_streaming.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (i *OpenAIStreamingChatInterception) ProcessRequest(w http.ResponseWriter,
172172
})
173173
}
174174

175-
if events.hasInitiated() {
175+
if events.isStreaming() {
176176
// Check if the stream encountered any errors.
177177
if streamErr := stream.Err(); streamErr != nil {
178178
if isUnrecoverableError(streamErr) {

streaming.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,14 @@ func (s *eventStream) start(w http.ResponseWriter, r *http.Request) {
7979
return
8080
case ev, open = <-s.eventsCh: // Once closed, the buffered channel will drain all buffered values before showing as closed.
8181
if !open {
82+
s.logger.Debug(ctx, "events channel closed")
8283
return
8384
}
8485

8586
// Initiate the stream once the first event is received.
8687
s.initiateOnce.Do(func() {
8788
s.initiated.Store(true)
89+
s.logger.Debug(ctx, "stream initiated")
8890

8991
// Send headers for Server-Sent Event stream.
9092
w.Header().Set("Content-Type", "text/event-stream")
@@ -185,8 +187,10 @@ func (s *eventStream) Shutdown(shutdownCtx context.Context) error {
185187
return err
186188
}
187189

188-
func (s *eventStream) hasInitiated() bool {
189-
return s.initiated.Load()
190+
// isStreaming checks if the stream has been initiated, or
191+
// when events are buffered which - when processed - will initiate the stream.
192+
func (s *eventStream) isStreaming() bool {
193+
return s.initiated.Load() || len(s.eventsCh) > 0
190194
}
191195

192196
// isConnError checks if an error is related to client disconnection or context cancellation.

0 commit comments

Comments
 (0)