@@ -122,6 +122,7 @@ export interface ExecutionContext {
122122 subscribeFieldResolver : GraphQLFieldResolver < any , any > ;
123123 errors : Array < GraphQLError > ;
124124 subsequentPayloads : Set < AsyncPayloadRecord > ;
125+ streams : Set < StreamContext > ;
125126}
126127
127128/**
@@ -504,6 +505,7 @@ export function buildExecutionContext(
504505 typeResolver : typeResolver ?? defaultTypeResolver ,
505506 subscribeFieldResolver : subscribeFieldResolver ?? defaultFieldResolver ,
506507 subsequentPayloads : new Set ( ) ,
508+ streams : new Set ( ) ,
507509 errors : [ ] ,
508510 } ;
509511}
@@ -516,6 +518,7 @@ function buildPerEventExecutionContext(
516518 ...exeContext ,
517519 rootValue : payload ,
518520 subsequentPayloads : new Set ( ) ,
521+ streams : new Set ( ) ,
519522 errors : [ ] ,
520523 } ;
521524}
@@ -1036,6 +1039,11 @@ async function completeAsyncIteratorValue(
10361039 typeof stream . initialCount === 'number' &&
10371040 index >= stream . initialCount
10381041 ) {
1042+ const streamContext : StreamContext = {
1043+ path : pathToArray ( path ) ,
1044+ iterator,
1045+ } ;
1046+ exeContext . streams . add ( streamContext ) ;
10391047 // eslint-disable-next-line @typescript-eslint/no-floating-promises
10401048 executeStreamIterator (
10411049 index ,
@@ -1045,6 +1053,7 @@ async function completeAsyncIteratorValue(
10451053 info ,
10461054 itemType ,
10471055 path ,
1056+ streamContext ,
10481057 stream . label ,
10491058 asyncPayloadRecord ,
10501059 ) ;
@@ -1129,6 +1138,7 @@ function completeListValue(
11291138 let previousAsyncPayloadRecord = asyncPayloadRecord ;
11301139 const completedResults : Array < unknown > = [ ] ;
11311140 let index = 0 ;
1141+ let streamContext : StreamContext | undefined ;
11321142 for ( const item of result ) {
11331143 // No need to modify the info object containing the path,
11341144 // since from here on it is not ever accessed by resolver functions.
@@ -1139,6 +1149,8 @@ function completeListValue(
11391149 typeof stream . initialCount === 'number' &&
11401150 index >= stream . initialCount
11411151 ) {
1152+ streamContext = { path : pathToArray ( path ) } ;
1153+ exeContext . streams . add ( streamContext ) ;
11421154 previousAsyncPayloadRecord = executeStreamField (
11431155 path ,
11441156 itemPath ,
@@ -1147,6 +1159,7 @@ function completeListValue(
11471159 fieldNodes ,
11481160 info ,
11491161 itemType ,
1162+ streamContext ,
11501163 stream . label ,
11511164 previousAsyncPayloadRecord ,
11521165 ) ;
@@ -1173,6 +1186,10 @@ function completeListValue(
11731186 index ++ ;
11741187 }
11751188
1189+ if ( streamContext ) {
1190+ exeContext . streams . delete ( streamContext ) ;
1191+ }
1192+
11761193 return containsPromise ? Promise . all ( completedResults ) : completedResults ;
11771194}
11781195
@@ -1813,6 +1830,7 @@ function executeStreamField(
18131830 fieldNodes : ReadonlyArray < FieldNode > ,
18141831 info : GraphQLResolveInfo ,
18151832 itemType : GraphQLOutputType ,
1833+ streamContext : StreamContext ,
18161834 label ?: string ,
18171835 parentContext ?: AsyncPayloadRecord ,
18181836) : AsyncPayloadRecord {
@@ -1835,6 +1853,8 @@ function executeStreamField(
18351853 ( value ) => [ value ] ,
18361854 ( error ) => {
18371855 asyncPayloadRecord . errors . push ( error ) ;
1856+ returnStreamIteratorIgnoringErrors ( streamContext ) ;
1857+ exeContext . streams . delete ( streamContext ) ;
18381858 filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
18391859 return null ;
18401860 } ,
@@ -1867,6 +1887,8 @@ function executeStreamField(
18671887 }
18681888 } catch ( error ) {
18691889 asyncPayloadRecord . errors . push ( error ) ;
1890+ returnStreamIteratorIgnoringErrors ( streamContext ) ;
1891+ exeContext . streams . delete ( streamContext ) ;
18701892 filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
18711893 asyncPayloadRecord . addItems ( null ) ;
18721894 return asyncPayloadRecord ;
@@ -1887,6 +1909,8 @@ function executeStreamField(
18871909 . then (
18881910 ( value ) => [ value ] ,
18891911 ( error ) => {
1912+ returnStreamIteratorIgnoringErrors ( streamContext ) ;
1913+ exeContext . streams . delete ( streamContext ) ;
18901914 asyncPayloadRecord . errors . push ( error ) ;
18911915 filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
18921916 return null ;
@@ -1965,19 +1989,18 @@ async function executeStreamIterator(
19651989 info : GraphQLResolveInfo ,
19661990 itemType : GraphQLOutputType ,
19671991 path : Path ,
1992+ streamContext : StreamContext ,
19681993 label ?: string ,
19691994 parentContext ?: AsyncPayloadRecord ,
19701995) : Promise < void > {
19711996 let index = initialIndex ;
19721997 let previousAsyncPayloadRecord = parentContext ?? undefined ;
1973- // eslint-disable-next-line no-constant-condition
1974- while ( true ) {
1998+ while ( exeContext . streams . has ( streamContext ) ) {
19751999 const itemPath = addPath ( path , index , undefined ) ;
19762000 const asyncPayloadRecord = new StreamRecord ( {
19772001 label,
19782002 path : itemPath ,
19792003 parentContext : previousAsyncPayloadRecord ,
1980- iterator,
19812004 exeContext,
19822005 } ) ;
19832006
@@ -1995,14 +2018,10 @@ async function executeStreamIterator(
19952018 ) ;
19962019 } catch ( error ) {
19972020 asyncPayloadRecord . errors . push ( error ) ;
2021+ returnStreamIteratorIgnoringErrors ( streamContext ) ;
2022+ exeContext . streams . delete ( streamContext ) ;
19982023 filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
19992024 asyncPayloadRecord . addItems ( null ) ;
2000- // entire stream has errored and bubbled upwards
2001- if ( iterator ?. return ) {
2002- iterator . return ( ) . catch ( ( ) => {
2003- // ignore errors
2004- } ) ;
2005- }
20062025 return ;
20072026 }
20082027
@@ -2014,6 +2033,8 @@ async function executeStreamIterator(
20142033 ( value ) => [ value ] ,
20152034 ( error ) => {
20162035 asyncPayloadRecord . errors . push ( error ) ;
2036+ returnStreamIteratorIgnoringErrors ( streamContext ) ;
2037+ exeContext . streams . delete ( streamContext ) ;
20172038 filterSubsequentPayloads ( exeContext , path , asyncPayloadRecord ) ;
20182039 return null ;
20192040 } ,
@@ -2025,6 +2046,7 @@ async function executeStreamIterator(
20252046 asyncPayloadRecord . addItems ( completedItems ) ;
20262047
20272048 if ( done ) {
2049+ exeContext . streams . delete ( streamContext ) ;
20282050 break ;
20292051 }
20302052 previousAsyncPayloadRecord = asyncPayloadRecord ;
@@ -2038,6 +2060,16 @@ function filterSubsequentPayloads(
20382060 currentAsyncRecord : AsyncPayloadRecord | undefined ,
20392061) : void {
20402062 const nullPathArray = pathToArray ( nullPath ) ;
2063+ exeContext . streams . forEach ( ( stream ) => {
2064+ for ( let i = 0 ; i < nullPathArray . length ; i ++ ) {
2065+ if ( stream . path [ i ] !== nullPathArray [ i ] ) {
2066+ // stream points to a path unaffected by this payload
2067+ return ;
2068+ }
2069+ }
2070+ returnStreamIteratorIgnoringErrors ( stream ) ;
2071+ exeContext . streams . delete ( stream ) ;
2072+ } ) ;
20412073 exeContext . subsequentPayloads . forEach ( ( asyncRecord ) => {
20422074 if ( asyncRecord === currentAsyncRecord ) {
20432075 // don't remove payload from where error originates
@@ -2049,16 +2081,21 @@ function filterSubsequentPayloads(
20492081 return ;
20502082 }
20512083 }
2052- // asyncRecord path points to nulled error field
2053- if ( isStreamPayload ( asyncRecord ) && asyncRecord . iterator ?. return ) {
2054- asyncRecord . iterator . return ( ) . catch ( ( ) => {
2055- // ignore error
2056- } ) ;
2057- }
20582084 exeContext . subsequentPayloads . delete ( asyncRecord ) ;
20592085 } ) ;
20602086}
20612087
2088+ function returnStreamIteratorIgnoringErrors (
2089+ streamContext : StreamContext ,
2090+ ) : void {
2091+ const returnFn = streamContext . iterator ?. return ;
2092+ if ( returnFn ) {
2093+ returnFn ( ) . catch ( ( ) => {
2094+ // ignore error
2095+ } ) ;
2096+ }
2097+ }
2098+
20622099function getCompletedIncrementalResults (
20632100 exeContext : ExecutionContext ,
20642101) : Array < IncrementalResult > {
@@ -2133,12 +2170,9 @@ function yieldSubsequentPayloads(
21332170
21342171 function returnStreamIterators ( ) {
21352172 const promises : Array < Promise < IteratorResult < unknown > > > = [ ] ;
2136- exeContext . subsequentPayloads . forEach ( ( asyncPayloadRecord ) => {
2137- if (
2138- isStreamPayload ( asyncPayloadRecord ) &&
2139- asyncPayloadRecord . iterator ?. return
2140- ) {
2141- promises . push ( asyncPayloadRecord . iterator . return ( ) ) ;
2173+ exeContext . streams . forEach ( ( stream ) => {
2174+ if ( stream . iterator ?. return ) {
2175+ promises . push ( stream . iterator . return ( ) ) ;
21422176 }
21432177 } ) ;
21442178 return Promise . all ( promises ) ;
@@ -2211,6 +2245,10 @@ class DeferredFragmentRecord {
22112245 this . _resolve ?.( data ) ;
22122246 }
22132247}
2248+ interface StreamContext {
2249+ path : Array < string | number > ;
2250+ iterator ?: AsyncIterator < unknown > | undefined ;
2251+ }
22142252
22152253class StreamRecord {
22162254 type : 'stream' ;
@@ -2220,15 +2258,13 @@ class StreamRecord {
22202258 items : Array < unknown > | null ;
22212259 promise : Promise < void > ;
22222260 parentContext : AsyncPayloadRecord | undefined ;
2223- iterator : AsyncIterator < unknown > | undefined ;
22242261 isCompletedIterator ?: boolean ;
22252262 isCompleted : boolean ;
22262263 _exeContext : ExecutionContext ;
22272264 _resolve ?: ( arg : PromiseOrValue < Array < unknown > | null > ) => void ;
22282265 constructor ( opts : {
22292266 label : string | undefined ;
22302267 path : Path | undefined ;
2231- iterator ?: AsyncIterator < unknown > ;
22322268 parentContext : AsyncPayloadRecord | undefined ;
22332269 exeContext : ExecutionContext ;
22342270 } ) {
@@ -2237,7 +2273,6 @@ class StreamRecord {
22372273 this . label = opts . label ;
22382274 this . path = pathToArray ( opts . path ) ;
22392275 this . parentContext = opts . parentContext ;
2240- this . iterator = opts . iterator ;
22412276 this . errors = [ ] ;
22422277 this . _exeContext = opts . exeContext ;
22432278 this . _exeContext . subsequentPayloads . add ( this ) ;
0 commit comments