33
44using System . Collections . Generic ;
55using System . Runtime . CompilerServices ;
6- using System . Text ;
76using System . Threading ;
87using System . Threading . Tasks ;
98using Microsoft . Shared . Diagnostics ;
@@ -48,13 +47,12 @@ public override async Task<ChatCompletion> CompleteAsync(IList<ChatMessage> chat
4847 // concurrent callers might trigger duplicate requests, but that's acceptable.
4948 var cacheKey = GetCacheKey ( false , chatMessages , options ) ;
5049
51- if ( await ReadCacheAsync ( cacheKey , cancellationToken ) . ConfigureAwait ( false ) is ChatCompletion existing )
50+ if ( await ReadCacheAsync ( cacheKey , cancellationToken ) . ConfigureAwait ( false ) is not { } result )
5251 {
53- return existing ;
52+ result = await base . CompleteAsync ( chatMessages , options , cancellationToken ) . ConfigureAwait ( false ) ;
53+ await WriteCacheAsync ( cacheKey , result , cancellationToken ) . ConfigureAwait ( false ) ;
5454 }
5555
56- var result = await base . CompleteAsync ( chatMessages , options , cancellationToken ) . ConfigureAwait ( false ) ;
57- await WriteCacheAsync ( cacheKey , result , cancellationToken ) . ConfigureAwait ( false ) ;
5856 return result ;
5957 }
6058
@@ -64,127 +62,59 @@ public override async IAsyncEnumerable<StreamingChatCompletionUpdate> CompleteSt
6462 {
6563 _ = Throw . IfNull ( chatMessages ) ;
6664
67- var cacheKey = GetCacheKey ( true , chatMessages , options ) ;
68- if ( await ReadCacheStreamingAsync ( cacheKey , cancellationToken ) . ConfigureAwait ( false ) is { } existingChunks )
65+ if ( CoalesceStreamingUpdates )
6966 {
70- // Yield all of the cached items.
71- foreach ( var chunk in existingChunks )
67+ // When coalescing updates, we cache non-streaming results coalesced from streaming ones. That means
68+ // we make a streaming request, yielding those results, but then convert those into a non-streaming
69+ // result and cache it. When we get a cache hit, we yield the non-streaming result as a streaming one.
70+
71+ var cacheKey = GetCacheKey ( true , chatMessages , options ) ;
72+ if ( await ReadCacheAsync ( cacheKey , cancellationToken ) . ConfigureAwait ( false ) is { } chatCompletion )
7273 {
73- yield return chunk ;
74+ // Yield all of the cached items.
75+ foreach ( var chunk in chatCompletion . ToStreamingChatCompletionUpdates ( ) )
76+ {
77+ yield return chunk ;
78+ }
79+ }
80+ else
81+ {
82+ // Yield and store all of the items.
83+ List < StreamingChatCompletionUpdate > capturedItems = [ ] ;
84+ await foreach ( var chunk in base . CompleteStreamingAsync ( chatMessages , options , cancellationToken ) . ConfigureAwait ( false ) )
85+ {
86+ capturedItems . Add ( chunk ) ;
87+ yield return chunk ;
88+ }
89+
90+ // Write the captured items to the cache as a non-streaming result.
91+ await WriteCacheAsync ( cacheKey , capturedItems . ToChatCompletion ( ) , cancellationToken ) . ConfigureAwait ( false ) ;
7492 }
7593 }
7694 else
7795 {
78- // Yield and store all of the items.
79- List < StreamingChatCompletionUpdate > capturedItems = [ ] ;
80- await foreach ( var chunk in base . CompleteStreamingAsync ( chatMessages , options , cancellationToken ) . ConfigureAwait ( false ) )
96+ var cacheKey = GetCacheKey ( true , chatMessages , options ) ;
97+ if ( await ReadCacheStreamingAsync ( cacheKey , cancellationToken ) . ConfigureAwait ( false ) is { } existingChunks )
8198 {
82- capturedItems . Add ( chunk ) ;
83- yield return chunk ;
99+ // Yield all of the cached items.
100+ foreach ( var chunk in existingChunks )
101+ {
102+ yield return chunk ;
103+ }
84104 }
85-
86- // If the caching client is configured to coalesce streaming updates, do so now within the capturedItems list.
87- if ( CoalesceStreamingUpdates )
105+ else
88106 {
89- StringBuilder coalescedText = new ( ) ;
90-
91- // Iterate through all of the items in the list looking for contiguous items that can be coalesced.
92- for ( int startInclusive = 0 ; startInclusive < capturedItems . Count ; startInclusive ++ )
107+ // Yield and store all of the items.
108+ List < StreamingChatCompletionUpdate > capturedItems = [ ] ;
109+ await foreach ( var chunk in base . CompleteStreamingAsync ( chatMessages , options , cancellationToken ) . ConfigureAwait ( false ) )
93110 {
94- // If an item isn't generally coalescable, skip it.
95- StreamingChatCompletionUpdate update = capturedItems [ startInclusive ] ;
96- if ( update . ChoiceIndex != 0 ||
97- update . Contents . Count != 1 ||
98- update . Contents [ 0 ] is not TextContent textContent )
99- {
100- continue ;
101- }
102-
103- // We found a coalescable item. Look for more contiguous items that are also coalescable with it.
104- int endExclusive = startInclusive + 1 ;
105- for ( ; endExclusive < capturedItems . Count ; endExclusive ++ )
106- {
107- StreamingChatCompletionUpdate next = capturedItems [ endExclusive ] ;
108- if ( next . ChoiceIndex != 0 ||
109- next . Contents . Count != 1 ||
110- next . Contents [ 0 ] is not TextContent ||
111-
112- // changing role or author would be really strange, but check anyway
113- ( update . Role is not null && next . Role is not null && update . Role != next . Role ) ||
114- ( update . AuthorName is not null && next . AuthorName is not null && update . AuthorName != next . AuthorName ) )
115- {
116- break ;
117- }
118- }
119-
120- // If we couldn't find anything to coalesce, there's nothing to do.
121- if ( endExclusive - startInclusive <= 1 )
122- {
123- continue ;
124- }
125-
126- // We found a coalescable run of items. Create a new node to represent the run. We create a new one
127- // rather than reappropriating one of the existing ones so as not to mutate an item already yielded.
128- _ = coalescedText . Clear ( ) . Append ( capturedItems [ startInclusive ] . Text ) ;
129-
130- TextContent coalescedContent = new ( null ) // will patch the text after examining all items in the run
131- {
132- AdditionalProperties = textContent . AdditionalProperties ? . Clone ( ) ,
133- } ;
134-
135- StreamingChatCompletionUpdate coalesced = new ( )
136- {
137- AdditionalProperties = update . AdditionalProperties ? . Clone ( ) ,
138- AuthorName = update . AuthorName ,
139- CompletionId = update . CompletionId ,
140- Contents = [ coalescedContent ] ,
141- CreatedAt = update . CreatedAt ,
142- FinishReason = update . FinishReason ,
143- ModelId = update . ModelId ,
144- Role = update . Role ,
145-
146- // Explicitly don't include RawRepresentation. It's not applicable if one update ends up being used
147- // to represent multiple, and it won't be serialized anyway.
148- } ;
149-
150- // Replace the starting node with the coalesced node.
151- capturedItems [ startInclusive ] = coalesced ;
152-
153- // Now iterate through all the rest of the updates in the run, updating the coalesced node with relevant properties,
154- // and nulling out the nodes along the way. We do this rather than removing the entry in order to avoid an O(N^2) operation.
155- // We'll remove all the null entries at the end of the loop, using RemoveAll to do so, which can remove all of
156- // the nulls in a single O(N) pass.
157- for ( int i = startInclusive + 1 ; i < endExclusive ; i ++ )
158- {
159- // Grab the next item.
160- StreamingChatCompletionUpdate next = capturedItems [ i ] ;
161- capturedItems [ i ] = null ! ;
162-
163- var nextContent = ( TextContent ) next . Contents [ 0 ] ;
164- _ = coalescedText . Append ( nextContent . Text ) ;
165-
166- coalesced . AuthorName ??= next . AuthorName ;
167- coalesced . CompletionId ??= next . CompletionId ;
168- coalesced . CreatedAt ??= next . CreatedAt ;
169- coalesced . FinishReason ??= next . FinishReason ;
170- coalesced . ModelId ??= next . ModelId ;
171- coalesced . Role ??= next . Role ;
172- }
173-
174- // Complete the coalescing by patching the text of the coalesced node.
175- coalesced . Text = coalescedText . ToString ( ) ;
176-
177- // Jump to the last update in the run, so that when we loop around and bump ahead,
178- // we're at the next update just after the run.
179- startInclusive = endExclusive - 1 ;
111+ capturedItems . Add ( chunk ) ;
112+ yield return chunk ;
180113 }
181114
182- // Remove all of the null slots left over from the coalescing process .
183- _ = capturedItems . RemoveAll ( u => u is null ) ;
115+ // Write the captured items to the cache .
116+ await WriteCacheStreamingAsync ( cacheKey , capturedItems , cancellationToken ) . ConfigureAwait ( false ) ;
184117 }
185-
186- // Write the captured items to the cache.
187- await WriteCacheStreamingAsync ( cacheKey , capturedItems , cancellationToken ) . ConfigureAwait ( false ) ;
188118 }
189119 }
190120
0 commit comments