@@ -341,20 +341,25 @@ function onStreamClose(code) {
341341
342342 stream [ kState ] . fd = - 1 ;
343343 // Defer destroy we actually emit end.
344- if ( stream . _readableState . endEmitted || code !== NGHTTP2_NO_ERROR ) {
344+ if ( ! stream . readable || code !== NGHTTP2_NO_ERROR ) {
345345 // If errored or ended, we can destroy immediately.
346- stream [ kMaybeDestroy ] ( null , code ) ;
346+ stream [ kMaybeDestroy ] ( code ) ;
347347 } else {
348348 // Wait for end to destroy.
349349 stream . on ( 'end' , stream [ kMaybeDestroy ] ) ;
350350 // Push a null so the stream can end whenever the client consumes
351351 // it completely.
352352 stream . push ( null ) ;
353- // If the client hasn't tried to consume the stream and there is no
354- // resume scheduled (which would indicate they would consume in the future),
355- // then just dump the incoming data so that the stream can be destroyed.
356- if ( ! stream [ kState ] . didRead && ! stream . _readableState . resumeScheduled )
353+
354+ // If the user hasn't tried to consume the stream (and this is a server
355+ // session) then just dump the incoming data so that the stream can
356+ // be destroyed.
357+ if ( stream [ kSession ] [ kType ] === NGHTTP2_SESSION_SERVER &&
358+ ! stream [ kState ] . didRead &&
359+ stream . readableFlowing === null )
357360 stream . resume ( ) ;
361+ else
362+ stream . read ( 0 ) ;
358363 }
359364}
360365
@@ -379,7 +384,7 @@ function onStreamRead(nread, buf) {
379384 `${ sessionName ( stream [ kSession ] [ kType ] ) } ]: ending readable.` ) ;
380385
381386 // defer this until we actually emit end
382- if ( stream . _readableState . endEmitted ) {
387+ if ( ! stream . readable ) {
383388 stream [ kMaybeDestroy ] ( ) ;
384389 } else {
385390 stream . on ( 'end' , stream [ kMaybeDestroy ] ) ;
@@ -469,8 +474,7 @@ function onGoawayData(code, lastStreamID, buf) {
469474 // goaway using NGHTTP2_NO_ERROR because there was no error
470475 // condition on this side of the session that caused the
471476 // shutdown.
472- session . destroy ( new ERR_HTTP2_SESSION_ERROR ( code ) ,
473- { errorCode : NGHTTP2_NO_ERROR } ) ;
477+ session . destroy ( new ERR_HTTP2_SESSION_ERROR ( code ) , NGHTTP2_NO_ERROR ) ;
474478 }
475479}
476480
@@ -813,6 +817,21 @@ function emitClose(self, error) {
813817 self . emit ( 'close' ) ;
814818}
815819
820+ function finishSessionDestroy ( session , error ) {
821+ const socket = session [ kSocket ] ;
822+ if ( ! socket . destroyed )
823+ socket . destroy ( error ) ;
824+
825+ session [ kProxySocket ] = undefined ;
826+ session [ kSocket ] = undefined ;
827+ session [ kHandle ] = undefined ;
828+ socket [ kSession ] = undefined ;
829+ socket [ kServer ] = undefined ;
830+
831+ // Finally, emit the close and error events (if necessary) on next tick.
832+ process . nextTick ( emitClose , session , error ) ;
833+ }
834+
816835// Upon creation, the Http2Session takes ownership of the socket. The session
817836// may not be ready to use immediately if the socket is not yet fully connected.
818837// In that case, the Http2Session will wait for the socket to connect. Once
@@ -869,6 +888,8 @@ class Http2Session extends EventEmitter {
869888
870889 this [ kState ] = {
871890 flags : SESSION_FLAGS_PENDING ,
891+ goawayCode : null ,
892+ goawayLastStreamID : null ,
872893 streams : new Map ( ) ,
873894 pendingStreams : new Set ( ) ,
874895 pendingAck : 0 ,
@@ -1171,25 +1192,13 @@ class Http2Session extends EventEmitter {
11711192 if ( handle !== undefined )
11721193 handle . destroy ( code , socket . destroyed ) ;
11731194
1174- // If there is no error , use setImmediate to destroy the socket on the
1195+ // If the socket is alive , use setImmediate to destroy the session on the
11751196 // next iteration of the event loop in order to give data time to transmit.
11761197 // Otherwise, destroy immediately.
1177- if ( ! socket . destroyed ) {
1178- if ( ! error ) {
1179- setImmediate ( socket . destroy . bind ( socket ) ) ;
1180- } else {
1181- socket . destroy ( error ) ;
1182- }
1183- }
1184-
1185- this [ kProxySocket ] = undefined ;
1186- this [ kSocket ] = undefined ;
1187- this [ kHandle ] = undefined ;
1188- socket [ kSession ] = undefined ;
1189- socket [ kServer ] = undefined ;
1190-
1191- // Finally, emit the close and error events (if necessary) on next tick.
1192- process . nextTick ( emitClose , this , error ) ;
1198+ if ( ! socket . destroyed )
1199+ setImmediate ( finishSessionDestroy , this , error ) ;
1200+ else
1201+ finishSessionDestroy ( this , error ) ;
11931202 }
11941203
11951204 // Closing the session will:
@@ -1441,11 +1450,8 @@ function afterDoStreamWrite(status, handle) {
14411450}
14421451
14431452function streamOnResume ( ) {
1444- if ( ! this . destroyed && ! this . pending ) {
1445- if ( ! this [ kState ] . didRead )
1446- this [ kState ] . didRead = true ;
1453+ if ( ! this . destroyed )
14471454 this [ kHandle ] . readStart ( ) ;
1448- }
14491455}
14501456
14511457function streamOnPause ( ) {
@@ -1460,6 +1466,16 @@ function afterShutdown() {
14601466 stream [ kMaybeDestroy ] ( ) ;
14611467}
14621468
1469+ function finishSendTrailers ( stream , headersList ) {
1470+ stream [ kState ] . flags &= ~ STREAM_FLAGS_HAS_TRAILERS ;
1471+
1472+ const ret = stream [ kHandle ] . trailers ( headersList ) ;
1473+ if ( ret < 0 )
1474+ stream . destroy ( new NghttpError ( ret ) ) ;
1475+ else
1476+ stream [ kMaybeDestroy ] ( ) ;
1477+ }
1478+
14631479function closeStream ( stream , code , shouldSubmitRstStream = true ) {
14641480 const state = stream [ kState ] ;
14651481 state . flags |= STREAM_FLAGS_CLOSED ;
@@ -1521,6 +1537,10 @@ class Http2Stream extends Duplex {
15211537 this [ kSession ] = session ;
15221538 session [ kState ] . pendingStreams . add ( this ) ;
15231539
1540+ // Allow our logic for determining whether any reads have happened to
1541+ // work in all situations. This is similar to what we do in _http_incoming.
1542+ this . _readableState . readingMore = true ;
1543+
15241544 this [ kTimeout ] = null ;
15251545
15261546 this [ kState ] = {
@@ -1531,7 +1551,6 @@ class Http2Stream extends Duplex {
15311551 trailersReady : false
15321552 } ;
15331553
1534- this . on ( 'resume' , streamOnResume ) ;
15351554 this . on ( 'pause' , streamOnPause ) ;
15361555 }
15371556
@@ -1725,6 +1744,10 @@ class Http2Stream extends Duplex {
17251744 this . push ( null ) ;
17261745 return ;
17271746 }
1747+ if ( ! this [ kState ] . didRead ) {
1748+ this . _readableState . readingMore = false ;
1749+ this [ kState ] . didRead = true ;
1750+ }
17281751 if ( ! this . pending ) {
17291752 streamOnResume . call ( this ) ;
17301753 } else {
@@ -1773,13 +1796,8 @@ class Http2Stream extends Duplex {
17731796 throw headersList ;
17741797 this [ kSentTrailers ] = headers ;
17751798
1776- this [ kState ] . flags &= ~ STREAM_FLAGS_HAS_TRAILERS ;
1777-
1778- const ret = this [ kHandle ] . trailers ( headersList ) ;
1779- if ( ret < 0 )
1780- this . destroy ( new NghttpError ( ret ) ) ;
1781- else
1782- this [ kMaybeDestroy ] ( ) ;
1799+ // Send the trailers in setImmediate so we don't do it on nghttp2 stack.
1800+ setImmediate ( finishSendTrailers , this , headersList ) ;
17831801 }
17841802
17851803 get closed ( ) {
@@ -1866,15 +1884,15 @@ class Http2Stream extends Duplex {
18661884 }
18671885 // The Http2Stream can be destroyed if it has closed and if the readable
18681886 // side has received the final chunk.
1869- [ kMaybeDestroy ] ( error , code = NGHTTP2_NO_ERROR ) {
1870- if ( error || code !== NGHTTP2_NO_ERROR ) {
1871- this . destroy ( error ) ;
1887+ [ kMaybeDestroy ] ( code = NGHTTP2_NO_ERROR ) {
1888+ if ( code !== NGHTTP2_NO_ERROR ) {
1889+ this . destroy ( ) ;
18721890 return ;
18731891 }
18741892
18751893 // TODO(mcollina): remove usage of _*State properties
1876- if ( this . _writableState . ended && this . _writableState . pendingcb === 0 ) {
1877- if ( this . _readableState . ended && this . closed ) {
1894+ if ( ! this . writable ) {
1895+ if ( ! this . readable && this . closed ) {
18781896 this . destroy ( ) ;
18791897 return ;
18801898 }
@@ -1887,7 +1905,7 @@ class Http2Stream extends Duplex {
18871905 this [ kSession ] [ kType ] === NGHTTP2_SESSION_SERVER &&
18881906 ! ( state . flags & STREAM_FLAGS_HAS_TRAILERS ) &&
18891907 ! state . didRead &&
1890- ! this . _readableState . resumeScheduled ) {
1908+ this . readableFlowing === null ) {
18911909 this . close ( ) ;
18921910 }
18931911 }
@@ -2477,6 +2495,10 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout);
24772495function socketOnError ( error ) {
24782496 const session = this [ kSession ] ;
24792497 if ( session !== undefined ) {
2498+ // We can ignore ECONNRESET after GOAWAY was received as there's nothing
2499+ // we can do and the other side is fully within its rights to do so.
2500+ if ( error . code === 'ECONNRESET' && session [ kState ] . goawayCode !== null )
2501+ return session . destroy ( ) ;
24802502 debug ( `Http2Session ${ sessionName ( session [ kType ] ) } : socket error [` +
24812503 `${ error . message } ]` ) ;
24822504 session . destroy ( error ) ;
0 commit comments