@@ -292,20 +292,25 @@ function onStreamClose(code) {
292292 tryClose ( stream [ kState ] . fd ) ;
293293
294294 // Defer destroy we actually emit end.
295- if ( stream . _readableState . endEmitted || code !== NGHTTP2_NO_ERROR ) {
295+ if ( ! stream . readable || code !== NGHTTP2_NO_ERROR ) {
296296 // If errored or ended, we can destroy immediately.
297- stream [ kMaybeDestroy ] ( null , code ) ;
297+ stream [ kMaybeDestroy ] ( code ) ;
298298 } else {
299299 // Wait for end to destroy.
300300 stream . on ( 'end' , stream [ kMaybeDestroy ] ) ;
301301 // Push a null so the stream can end whenever the client consumes
302302 // it completely.
303303 stream . push ( null ) ;
304- // If the client hasn't tried to consume the stream and there is no
305- // resume scheduled (which would indicate they would consume in the future),
306- // then just dump the incoming data so that the stream can be destroyed.
307- if ( ! stream [ kState ] . didRead && ! stream . _readableState . resumeScheduled )
304+
305+ // If the user hasn't tried to consume the stream (and this is a server
306+ // session) then just dump the incoming data so that the stream can
307+ // be destroyed.
308+ if ( stream [ kSession ] [ kType ] === NGHTTP2_SESSION_SERVER &&
309+ ! stream [ kState ] . didRead &&
310+ stream . readableFlowing === null )
308311 stream . resume ( ) ;
312+ else
313+ stream . read ( 0 ) ;
309314 }
310315}
311316
@@ -330,7 +335,7 @@ function onStreamRead(nread, buf) {
330335 `${ sessionName ( stream [ kSession ] [ kType ] ) } ]: ending readable.` ) ;
331336
332337 // defer this until we actually emit end
333- if ( stream . _readableState . endEmitted ) {
338+ if ( ! stream . readable ) {
334339 stream [ kMaybeDestroy ] ( ) ;
335340 } else {
336341 stream . on ( 'end' , stream [ kMaybeDestroy ] ) ;
@@ -421,7 +426,7 @@ function onGoawayData(code, lastStreamID, buf) {
421426 // condition on this side of the session that caused the
422427 // shutdown.
423428 session . destroy ( new errors . Error ( 'ERR_HTTP2_SESSION_ERROR' , code ) ,
424- { errorCode : NGHTTP2_NO_ERROR } ) ;
429+ NGHTTP2_NO_ERROR ) ;
425430 }
426431}
427432
@@ -772,6 +777,21 @@ function emitClose(self, error) {
772777 self . emit ( 'close' ) ;
773778}
774779
780+ function finishSessionDestroy ( session , error ) {
781+ const socket = session [ kSocket ] ;
782+ if ( ! socket . destroyed )
783+ socket . destroy ( error ) ;
784+
785+ session [ kProxySocket ] = undefined ;
786+ session [ kSocket ] = undefined ;
787+ session [ kHandle ] = undefined ;
788+ socket [ kSession ] = undefined ;
789+ socket [ kServer ] = undefined ;
790+
791+ // Finally, emit the close and error events (if necessary) on next tick.
792+ process . nextTick ( emitClose , session , error ) ;
793+ }
794+
775795// Upon creation, the Http2Session takes ownership of the socket. The session
776796// may not be ready to use immediately if the socket is not yet fully connected.
777797// In that case, the Http2Session will wait for the socket to connect. Once
@@ -828,6 +848,8 @@ class Http2Session extends EventEmitter {
828848
829849 this [ kState ] = {
830850 flags : SESSION_FLAGS_PENDING ,
851+ goawayCode : null ,
852+ goawayLastStreamID : null ,
831853 streams : new Map ( ) ,
832854 pendingStreams : new Set ( ) ,
833855 pendingAck : 0 ,
@@ -1130,25 +1152,13 @@ class Http2Session extends EventEmitter {
11301152 if ( handle !== undefined )
11311153 handle . destroy ( code , socket . destroyed ) ;
11321154
1133- // If there is no error , use setImmediate to destroy the socket on the
1155+ // If the socket is alive , use setImmediate to destroy the session on the
11341156 // next iteration of the event loop in order to give data time to transmit.
11351157 // Otherwise, destroy immediately.
1136- if ( ! socket . destroyed ) {
1137- if ( ! error ) {
1138- setImmediate ( socket . destroy . bind ( socket ) ) ;
1139- } else {
1140- socket . destroy ( error ) ;
1141- }
1142- }
1143-
1144- this [ kProxySocket ] = undefined ;
1145- this [ kSocket ] = undefined ;
1146- this [ kHandle ] = undefined ;
1147- socket [ kSession ] = undefined ;
1148- socket [ kServer ] = undefined ;
1149-
1150- // Finally, emit the close and error events (if necessary) on next tick.
1151- process . nextTick ( emitClose , this , error ) ;
1158+ if ( ! socket . destroyed )
1159+ setImmediate ( finishSessionDestroy , this , error ) ;
1160+ else
1161+ finishSessionDestroy ( this , error ) ;
11521162 }
11531163
11541164 // Closing the session will:
@@ -1422,11 +1432,8 @@ function afterDoStreamWrite(status, handle, req) {
14221432}
14231433
14241434function streamOnResume ( ) {
1425- if ( ! this . destroyed && ! this . pending ) {
1426- if ( ! this [ kState ] . didRead )
1427- this [ kState ] . didRead = true ;
1435+ if ( ! this . destroyed )
14281436 this [ kHandle ] . readStart ( ) ;
1429- }
14301437}
14311438
14321439function streamOnPause ( ) {
@@ -1441,6 +1448,16 @@ function afterShutdown() {
14411448 stream [ kMaybeDestroy ] ( ) ;
14421449}
14431450
1451+ function finishSendTrailers ( stream , headersList ) {
1452+ stream [ kState ] . flags &= ~ STREAM_FLAGS_HAS_TRAILERS ;
1453+
1454+ const ret = stream [ kHandle ] . trailers ( headersList ) ;
1455+ if ( ret < 0 )
1456+ stream . destroy ( new NghttpError ( ret ) ) ;
1457+ else
1458+ stream [ kMaybeDestroy ] ( ) ;
1459+ }
1460+
14441461function closeStream ( stream , code , shouldSubmitRstStream = true ) {
14451462 const state = stream [ kState ] ;
14461463 state . flags |= STREAM_FLAGS_CLOSED ;
@@ -1502,6 +1519,10 @@ class Http2Stream extends Duplex {
15021519 this [ kSession ] = session ;
15031520 session [ kState ] . pendingStreams . add ( this ) ;
15041521
1522+ // Allow our logic for determining whether any reads have happened to
1523+ // work in all situations. This is similar to what we do in _http_incoming.
1524+ this . _readableState . readingMore = true ;
1525+
15051526 this [ kState ] = {
15061527 didRead : false ,
15071528 flags : STREAM_FLAGS_PENDING ,
@@ -1510,7 +1531,6 @@ class Http2Stream extends Duplex {
15101531 trailersReady : false
15111532 } ;
15121533
1513- this . on ( 'resume' , streamOnResume ) ;
15141534 this . on ( 'pause' , streamOnPause ) ;
15151535 }
15161536
@@ -1717,6 +1737,10 @@ class Http2Stream extends Duplex {
17171737 this . push ( null ) ;
17181738 return ;
17191739 }
1740+ if ( ! this [ kState ] . didRead ) {
1741+ this . _readableState . readingMore = false ;
1742+ this [ kState ] . didRead = true ;
1743+ }
17201744 if ( ! this . pending ) {
17211745 streamOnResume . call ( this ) ;
17221746 } else {
@@ -1765,13 +1789,8 @@ class Http2Stream extends Duplex {
17651789 throw headersList ;
17661790 this [ kSentTrailers ] = headers ;
17671791
1768- this [ kState ] . flags &= ~ STREAM_FLAGS_HAS_TRAILERS ;
1769-
1770- const ret = this [ kHandle ] . trailers ( headersList ) ;
1771- if ( ret < 0 )
1772- this . destroy ( new NghttpError ( ret ) ) ;
1773- else
1774- this [ kMaybeDestroy ] ( ) ;
1792+ // Send the trailers in setImmediate so we don't do it on nghttp2 stack.
1793+ setImmediate ( finishSendTrailers , this , headersList ) ;
17751794 }
17761795
17771796 get closed ( ) {
@@ -1861,15 +1880,15 @@ class Http2Stream extends Duplex {
18611880 }
18621881 // The Http2Stream can be destroyed if it has closed and if the readable
18631882 // side has received the final chunk.
1864- [ kMaybeDestroy ] ( error , code = NGHTTP2_NO_ERROR ) {
1865- if ( error || code !== NGHTTP2_NO_ERROR ) {
1866- this . destroy ( error ) ;
1883+ [ kMaybeDestroy ] ( code = NGHTTP2_NO_ERROR ) {
1884+ if ( code !== NGHTTP2_NO_ERROR ) {
1885+ this . destroy ( ) ;
18671886 return ;
18681887 }
18691888
18701889 // TODO(mcollina): remove usage of _*State properties
1871- if ( this . _writableState . ended && this . _writableState . pendingcb === 0 ) {
1872- if ( this . _readableState . ended && this . closed ) {
1890+ if ( ! this . writable ) {
1891+ if ( ! this . readable && this . closed ) {
18731892 this . destroy ( ) ;
18741893 return ;
18751894 }
@@ -1882,7 +1901,7 @@ class Http2Stream extends Duplex {
18821901 this [ kSession ] [ kType ] === NGHTTP2_SESSION_SERVER &&
18831902 ! ( state . flags & STREAM_FLAGS_HAS_TRAILERS ) &&
18841903 ! state . didRead &&
1885- ! this . _readableState . resumeScheduled ) {
1904+ this . readableFlowing === null ) {
18861905 this . close ( ) ;
18871906 }
18881907 }
@@ -2445,6 +2464,10 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout);
24452464function socketOnError ( error ) {
24462465 const session = this [ kSession ] ;
24472466 if ( session !== undefined ) {
2467+ // We can ignore ECONNRESET after GOAWAY was received as there's nothing
2468+ // we can do and the other side is fully within its rights to do so.
2469+ if ( error . code === 'ECONNRESET' && session [ kState ] . goawayCode !== null )
2470+ return session . destroy ( ) ;
24482471 debug ( `Http2Session ${ sessionName ( session [ kType ] ) } : socket error [` +
24492472 `${ error . message } ]` ) ;
24502473 session . destroy ( error ) ;
0 commit comments