@@ -7,12 +7,19 @@ import { Socket } from "./socket";
77import debugModule from "debug" ;
88import { serialize } from "cookie" ;
99import { Server as DEFAULT_WS_ENGINE } from "ws" ;
10- import { IncomingMessage , Server as HttpServer } from "http" ;
11- import { CookieSerializeOptions } from "cookie" ;
12- import { CorsOptions , CorsOptionsDelegate } from "cors" ;
10+ import type {
11+ IncomingMessage ,
12+ Server as HttpServer ,
13+ ServerResponse ,
14+ } from "http" ;
15+ import type { CookieSerializeOptions } from "cookie" ;
16+ import type { CorsOptions , CorsOptionsDelegate } from "cors" ;
17+ import type { Duplex } from "stream" ;
1318
1419const debug = debugModule ( "engine" ) ;
1520
21+ const kResponseHeaders = Symbol ( "responseHeaders" ) ;
22+
1623type Transport = "polling" | "websocket" ;
1724
1825export interface AttachOptions {
@@ -119,12 +126,26 @@ export interface ServerOptions {
119126 allowEIO3 ?: boolean ;
120127}
121128
129+ /**
130+ * An Express-compatible middleware.
131+ *
132+ * Middleware functions are functions that have access to the request object (req), the response object (res), and the
133+ * next middleware function in the application’s request-response cycle.
134+ *
135+ * @see https://expressjs.com/en/guide/using-middleware.html
136+ */
137+ type Middleware = (
138+ req : IncomingMessage ,
139+ res : ServerResponse ,
140+ next : ( ) => void
141+ ) => void ;
142+
122143export abstract class BaseServer extends EventEmitter {
123144 public opts : ServerOptions ;
124145
125146 protected clients : any ;
126147 private clientsCount : number ;
127- protected corsMiddleware : Function ;
148+ protected middlewares : Middleware [ ] = [ ] ;
128149
129150 /**
130151 * Server constructor.
@@ -170,7 +191,7 @@ export abstract class BaseServer extends EventEmitter {
170191 }
171192
172193 if ( this . opts . cors ) {
173- this . corsMiddleware = require ( "cors" ) ( this . opts . cors ) ;
194+ this . use ( require ( "cors" ) ( this . opts . cors ) ) ;
174195 }
175196
176197 if ( opts . perMessageDeflate ) {
@@ -289,6 +310,52 @@ export abstract class BaseServer extends EventEmitter {
289310 fn ( ) ;
290311 }
291312
313+ /**
314+ * Adds a new middleware.
315+ *
316+ * @example
317+ * import helmet from "helmet";
318+ *
319+ * engine.use(helmet());
320+ *
321+ * @param fn
322+ */
323+ public use ( fn : Middleware ) {
324+ this . middlewares . push ( fn ) ;
325+ }
326+
327+ /**
328+ * Apply the middlewares to the request.
329+ *
330+ * @param req
331+ * @param res
332+ * @param callback
333+ * @protected
334+ */
335+ protected _applyMiddlewares (
336+ req : IncomingMessage ,
337+ res : ServerResponse ,
338+ callback : ( ) => void
339+ ) {
340+ if ( this . middlewares . length === 0 ) {
341+ debug ( "no middleware to apply, skipping" ) ;
342+ return callback ( ) ;
343+ }
344+
345+ const apply = ( i ) => {
346+ debug ( "applying middleware n°%d" , i + 1 ) ;
347+ this . middlewares [ i ] ( req , res , ( ) => {
348+ if ( i + 1 < this . middlewares . length ) {
349+ apply ( i + 1 ) ;
350+ } else {
351+ callback ( ) ;
352+ }
353+ } ) ;
354+ } ;
355+
356+ apply ( 0 ) ;
357+ }
358+
292359 /**
293360 * Closes all clients.
294361 *
@@ -449,6 +516,40 @@ export abstract class BaseServer extends EventEmitter {
449516 } ;
450517}
451518
519+ /**
520+ * Exposes a subset of the http.ServerResponse interface, in order to be able to apply the middlewares to an upgrade
521+ * request.
522+ *
523+ * @see https://nodejs.org/api/http.html#class-httpserverresponse
524+ */
525+ class WebSocketResponse {
526+ constructor ( readonly req , readonly socket : Duplex ) {
527+ // temporarily store the response headers on the req object (see the "headers" event)
528+ req [ kResponseHeaders ] = { } ;
529+ }
530+
531+ public setHeader ( name : string , value : any ) {
532+ this . req [ kResponseHeaders ] [ name ] = value ;
533+ }
534+
535+ public getHeader ( name : string ) {
536+ return this . req [ kResponseHeaders ] [ name ] ;
537+ }
538+
539+ public removeHeader ( name : string ) {
540+ delete this . req [ kResponseHeaders ] [ name ] ;
541+ }
542+
543+ public write ( ) { }
544+
545+ public writeHead ( ) { }
546+
547+ public end ( ) {
548+ // we could return a proper error code, but the WebSocket client will emit an "error" event anyway.
549+ this . socket . destroy ( ) ;
550+ }
551+ }
552+
452553export class Server extends BaseServer {
453554 public httpServer ?: HttpServer ;
454555 private ws : any ;
@@ -474,7 +575,8 @@ export class Server extends BaseServer {
474575 this . ws . on ( "headers" , ( headersArray , req ) => {
475576 // note: 'ws' uses an array of headers, while Engine.IO uses an object (response.writeHead() accepts both formats)
476577 // we could also try to parse the array and then sync the values, but that will be error-prone
477- const additionalHeaders = { } ;
578+ const additionalHeaders = req [ kResponseHeaders ] || { } ;
579+ delete req [ kResponseHeaders ] ;
478580
479581 const isInitialRequest = ! req . _query . sid ;
480582 if ( isInitialRequest ) {
@@ -483,6 +585,7 @@ export class Server extends BaseServer {
483585
484586 this . emit ( "headers" , additionalHeaders , req ) ;
485587
588+ debug ( "writing headers: %j" , additionalHeaders ) ;
486589 Object . keys ( additionalHeaders ) . forEach ( ( key ) => {
487590 headersArray . push ( `${ key } : ${ additionalHeaders [ key ] } ` ) ;
488591 } ) ;
@@ -517,13 +620,14 @@ export class Server extends BaseServer {
517620 /**
518621 * Handles an Engine.IO HTTP request.
519622 *
520- * @param {http. IncomingMessage } request
521- * @param {http. ServerResponse|http.OutgoingMessage } response
623+ * @param {IncomingMessage } req
624+ * @param {ServerResponse } res
522625 * @api public
523626 */
524- public handleRequest ( req , res ) {
627+ public handleRequest ( req : IncomingMessage , res : ServerResponse ) {
525628 debug ( 'handling "%s" http request "%s"' , req . method , req . url ) ;
526629 this . prepare ( req ) ;
630+ // @ts -ignore
527631 req . res = res ;
528632
529633 const callback = ( errorCode , errorContext ) => {
@@ -538,51 +642,62 @@ export class Server extends BaseServer {
538642 return ;
539643 }
540644
645+ // @ts -ignore
541646 if ( req . _query . sid ) {
542647 debug ( "setting new request for existing client" ) ;
648+ // @ts -ignore
543649 this . clients [ req . _query . sid ] . transport . onRequest ( req ) ;
544650 } else {
545651 const closeConnection = ( errorCode , errorContext ) =>
546652 abortRequest ( res , errorCode , errorContext ) ;
653+ // @ts -ignore
547654 this . handshake ( req . _query . transport , req , closeConnection ) ;
548655 }
549656 } ;
550657
551- if ( this . corsMiddleware ) {
552- this . corsMiddleware . call ( null , req , res , ( ) => {
553- this . verify ( req , false , callback ) ;
554- } ) ;
555- } else {
658+ this . _applyMiddlewares ( req , res , ( ) => {
556659 this . verify ( req , false , callback ) ;
557- }
660+ } ) ;
558661 }
559662
560663 /**
561664 * Handles an Engine.IO HTTP Upgrade.
562665 *
563666 * @api public
564667 */
565- public handleUpgrade ( req , socket , upgradeHead ) {
668+ public handleUpgrade (
669+ req : IncomingMessage ,
670+ socket : Duplex ,
671+ upgradeHead : Buffer
672+ ) {
566673 this . prepare ( req ) ;
567674
568- this . verify ( req , true , ( errorCode , errorContext ) => {
569- if ( errorCode ) {
570- this . emit ( "connection_error" , {
571- req,
572- code : errorCode ,
573- message : Server . errorMessages [ errorCode ] ,
574- context : errorContext ,
575- } ) ;
576- abortUpgrade ( socket , errorCode , errorContext ) ;
577- return ;
578- }
675+ const res = new WebSocketResponse ( req , socket ) ;
579676
580- const head = Buffer . from ( upgradeHead ) ;
581- upgradeHead = null ;
677+ this . _applyMiddlewares ( req , res as unknown as ServerResponse , ( ) => {
678+ this . verify ( req , true , ( errorCode , errorContext ) => {
679+ if ( errorCode ) {
680+ this . emit ( "connection_error" , {
681+ req,
682+ code : errorCode ,
683+ message : Server . errorMessages [ errorCode ] ,
684+ context : errorContext ,
685+ } ) ;
686+ abortUpgrade ( socket , errorCode , errorContext ) ;
687+ return ;
688+ }
582689
583- // delegate to ws
584- this . ws . handleUpgrade ( req , socket , head , ( websocket ) => {
585- this . onWebSocket ( req , socket , websocket ) ;
690+ const head = Buffer . from ( upgradeHead ) ;
691+ upgradeHead = null ;
692+
693+ // some middlewares (like express-session) wait for the writeHead() call to flush their headers
694+ // see https://github.com/expressjs/session/blob/1010fadc2f071ddf2add94235d72224cf65159c6/index.js#L220-L244
695+ res . writeHead ( ) ;
696+
697+ // delegate to ws
698+ this . ws . handleUpgrade ( req , socket , head , ( websocket ) => {
699+ this . onWebSocket ( req , socket , websocket ) ;
700+ } ) ;
586701 } ) ;
587702 } ) ;
588703 }
0 commit comments