@@ -122,6 +122,7 @@ export interface ExecutionContext {
122122 subscribeFieldResolver : GraphQLFieldResolver < any , any > ;
123123 errors : Array < GraphQLError > ;
124124 subsequentPayloads : Set < AsyncPayloadRecord > ;
125+ streams : Set < StreamContext > ;
125126}
126127
127128/**
@@ -504,6 +505,7 @@ export function buildExecutionContext(
504505 typeResolver : typeResolver ?? defaultTypeResolver ,
505506 subscribeFieldResolver : subscribeFieldResolver ?? defaultFieldResolver ,
506507 subsequentPayloads : new Set ( ) ,
508+ streams : new Set ( ) ,
507509 errors : [ ] ,
508510 } ;
509511}
@@ -516,6 +518,7 @@ function buildPerEventExecutionContext(
516518 ...exeContext ,
517519 rootValue : payload ,
518520 subsequentPayloads : new Set ( ) ,
521+ streams : new Set ( ) ,
519522 errors : [ ] ,
520523 } ;
521524}
@@ -1036,6 +1039,11 @@ async function completeAsyncIteratorValue(
10361039 typeof stream . initialCount === 'number' &&
10371040 index >= stream . initialCount
10381041 ) {
1042+ const streamContext : StreamContext = {
1043+ path : pathToArray ( path ) ,
1044+ iterator,
1045+ } ;
1046+ exeContext . streams . add ( streamContext ) ;
10391047 // eslint-disable-next-line @typescript-eslint/no-floating-promises
10401048 executeStreamIterator (
10411049 index ,
@@ -1045,6 +1053,7 @@ async function completeAsyncIteratorValue(
10451053 info ,
10461054 itemType ,
10471055 path ,
1056+ streamContext ,
10481057 stream . label ,
10491058 asyncPayloadRecord ,
10501059 ) ;
@@ -1129,6 +1138,7 @@ function completeListValue(
11291138 let previousAsyncPayloadRecord = asyncPayloadRecord ;
11301139 const completedResults : Array < unknown > = [ ] ;
11311140 let index = 0 ;
1141+ let streamContext : StreamContext | undefined ;
11321142 for ( const item of result ) {
11331143 // No need to modify the info object containing the path,
11341144 // since from here on it is not ever accessed by resolver functions.
@@ -1139,6 +1149,8 @@ function completeListValue(
11391149 typeof stream . initialCount === 'number' &&
11401150 index >= stream . initialCount
11411151 ) {
1152+ streamContext = { path : pathToArray ( path ) } ;
1153+ exeContext . streams . add ( streamContext ) ;
11421154 previousAsyncPayloadRecord = executeStreamField (
11431155 path ,
11441156 itemPath ,
@@ -1147,6 +1159,7 @@ function completeListValue(
11471159 fieldNodes ,
11481160 info ,
11491161 itemType ,
1162+ streamContext ,
11501163 stream . label ,
11511164 previousAsyncPayloadRecord ,
11521165 ) ;
@@ -1173,6 +1186,10 @@ function completeListValue(
11731186 index ++ ;
11741187 }
11751188
1189+ if ( streamContext ) {
1190+ exeContext . streams . delete ( streamContext ) ;
1191+ }
1192+
11761193 return containsPromise ? Promise . all ( completedResults ) : completedResults ;
11771194}
11781195
@@ -1813,6 +1830,7 @@ function executeStreamField(
18131830 fieldNodes : ReadonlyArray < FieldNode > ,
18141831 info : GraphQLResolveInfo ,
18151832 itemType : GraphQLOutputType ,
1833+ streamContext : StreamContext ,
18161834 label ?: string ,
18171835 parentContext ?: AsyncPayloadRecord ,
18181836) : AsyncPayloadRecord {
@@ -1835,6 +1853,8 @@ function executeStreamField(
18351853 ( value ) => [ value ] ,
18361854 ( error ) => {
18371855 asyncPayloadRecord . errors . push ( error ) ;
1856+ returnStreamIteratorIgnoringError ( streamContext ) ;
1857+ exeContext . streams . delete ( streamContext ) ;
18381858 filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
18391859 return null ;
18401860 } ,
@@ -1867,6 +1887,8 @@ function executeStreamField(
18671887 }
18681888 } catch ( error ) {
18691889 asyncPayloadRecord . errors . push ( error ) ;
1890+ returnStreamIteratorIgnoringError ( streamContext ) ;
1891+ exeContext . streams . delete ( streamContext ) ;
18701892 filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
18711893 asyncPayloadRecord . addItems ( null ) ;
18721894 return asyncPayloadRecord ;
@@ -1887,6 +1909,8 @@ function executeStreamField(
18871909 . then (
18881910 ( value ) => [ value ] ,
18891911 ( error ) => {
1912+ returnStreamIteratorIgnoringError ( streamContext ) ;
1913+ exeContext . streams . delete ( streamContext ) ;
18901914 asyncPayloadRecord . errors . push ( error ) ;
18911915 filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
18921916 return null ;
@@ -1965,6 +1989,7 @@ async function executeStreamIterator(
19651989 info : GraphQLResolveInfo ,
19661990 itemType : GraphQLOutputType ,
19671991 path : Path ,
1992+ streamContext : StreamContext ,
19681993 label ?: string ,
19691994 parentContext ?: AsyncPayloadRecord ,
19701995) : Promise < void > {
@@ -1977,7 +2002,6 @@ async function executeStreamIterator(
19772002 label,
19782003 path : itemPath ,
19792004 parentContext : previousAsyncPayloadRecord ,
1980- iterator,
19812005 exeContext,
19822006 } ) ;
19832007
@@ -1995,14 +2019,10 @@ async function executeStreamIterator(
19952019 ) ;
19962020 } catch ( error ) {
19972021 asyncPayloadRecord . errors . push ( error ) ;
2022+ returnStreamIteratorIgnoringError ( streamContext ) ;
2023+ exeContext . streams . delete ( streamContext ) ;
19982024 filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
19992025 asyncPayloadRecord . addItems ( null ) ;
2000- // entire stream has errored and bubbled upwards
2001- if ( iterator ?. return ) {
2002- iterator . return ( ) . catch ( ( ) => {
2003- // ignore errors
2004- } ) ;
2005- }
20062026 return ;
20072027 }
20082028
@@ -2014,6 +2034,8 @@ async function executeStreamIterator(
20142034 ( value ) => [ value ] ,
20152035 ( error ) => {
20162036 asyncPayloadRecord . errors . push ( error ) ;
2037+ returnStreamIteratorIgnoringError ( streamContext ) ;
2038+ exeContext . streams . delete ( streamContext ) ;
20172039 filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
20182040 return null ;
20192041 } ,
@@ -2025,8 +2047,16 @@ async function executeStreamIterator(
20252047 asyncPayloadRecord . addItems ( completedItems ) ;
20262048
20272049 if ( done ) {
2050+ exeContext . streams . delete ( streamContext ) ;
2051+ break ;
2052+ }
2053+
2054+ if ( ! exeContext . streams . has ( streamContext ) ) {
2055+ // stream was filtered
2056+ returnStreamIteratorIgnoringError ( streamContext ) ;
20282057 break ;
20292058 }
2059+
20302060 previousAsyncPayloadRecord = asyncPayloadRecord ;
20312061 index ++ ;
20322062 }
@@ -2038,6 +2068,16 @@ function filterSubsequentPayloads(
20382068 currentAsyncRecord : AsyncPayloadRecord | undefined ,
20392069) : void {
20402070 const nullPathArray = pathToArray ( nullPath ) ;
2071+ exeContext . streams . forEach ( ( stream ) => {
2072+ for ( let i = 0 ; i < nullPathArray . length ; i ++ ) {
2073+ if ( stream . path [ i ] !== nullPathArray [ i ] ) {
2074+ // stream points to a path unaffected by this payload
2075+ return ;
2076+ }
2077+ }
2078+ returnStreamIteratorIgnoringError ( stream ) ;
2079+ exeContext . streams . delete ( stream ) ;
2080+ } ) ;
20412081 exeContext . subsequentPayloads . forEach ( ( asyncRecord ) => {
20422082 if ( asyncRecord === currentAsyncRecord ) {
20432083 // don't remove payload from where error originates
@@ -2049,16 +2089,16 @@ function filterSubsequentPayloads(
20492089 return ;
20502090 }
20512091 }
2052- // asyncRecord path points to nulled error field
2053- if ( isStreamPayload ( asyncRecord ) && asyncRecord . iterator ?. return ) {
2054- asyncRecord . iterator . return ( ) . catch ( ( ) => {
2055- // ignore error
2056- } ) ;
2057- }
20582092 exeContext . subsequentPayloads . delete ( asyncRecord ) ;
20592093 } ) ;
20602094}
20612095
2096+ function returnStreamIteratorIgnoringError ( streamContext : StreamContext ) : void {
2097+ streamContext . iterator ?. return ?.( ) . catch ( ( ) => {
2098+ // ignore error
2099+ } ) ;
2100+ }
2101+
20622102function getCompletedIncrementalResults (
20632103 exeContext : ExecutionContext ,
20642104) : Array < IncrementalResult > {
@@ -2133,12 +2173,9 @@ function yieldSubsequentPayloads(
21332173
21342174 function returnStreamIterators ( ) {
21352175 const promises : Array < Promise < IteratorResult < unknown > > > = [ ] ;
2136- exeContext . subsequentPayloads . forEach ( ( asyncPayloadRecord ) => {
2137- if (
2138- isStreamPayload ( asyncPayloadRecord ) &&
2139- asyncPayloadRecord . iterator ?. return
2140- ) {
2141- promises . push ( asyncPayloadRecord . iterator . return ( ) ) ;
2176+ exeContext . streams . forEach ( ( stream ) => {
2177+ if ( stream . iterator ?. return ) {
2178+ promises . push ( stream . iterator . return ( ) ) ;
21422179 }
21432180 } ) ;
21442181 return Promise . all ( promises ) ;
@@ -2211,6 +2248,10 @@ class DeferredFragmentRecord {
22112248 this . _resolve ?.( data ) ;
22122249 }
22132250}
2251+ interface StreamContext {
2252+ path : Array < string | number > ;
2253+ iterator ?: AsyncIterator < unknown > | undefined ;
2254+ }
22142255
22152256class StreamRecord {
22162257 type : 'stream' ;
@@ -2220,15 +2261,13 @@ class StreamRecord {
22202261 items : Array < unknown > | null ;
22212262 promise : Promise < void > ;
22222263 parentContext : AsyncPayloadRecord | undefined ;
2223- iterator : AsyncIterator < unknown > | undefined ;
22242264 isCompletedIterator ?: boolean ;
22252265 isCompleted : boolean ;
22262266 _exeContext : ExecutionContext ;
22272267 _resolve ?: ( arg : PromiseOrValue < Array < unknown > | null > ) => void ;
22282268 constructor ( opts : {
22292269 label : string | undefined ;
22302270 path : Path | undefined ;
2231- iterator ?: AsyncIterator < unknown > ;
22322271 parentContext : AsyncPayloadRecord | undefined ;
22332272 exeContext : ExecutionContext ;
22342273 } ) {
@@ -2237,7 +2276,6 @@ class StreamRecord {
22372276 this . label = opts . label ;
22382277 this . path = pathToArray ( opts . path ) ;
22392278 this . parentContext = opts . parentContext ;
2240- this . iterator = opts . iterator ;
22412279 this . errors = [ ] ;
22422280 this . _exeContext = opts . exeContext ;
22432281 this . _exeContext . subsequentPayloads . add ( this ) ;
0 commit comments