77 isNodeStream,
88 isReadable,
99 isWritable,
10+ isWebStream,
11+ isTransformStream,
12+ isWritableStream,
13+ isReadableStream,
1014} = require ( 'internal/streams/utils' ) ;
1115const {
1216 AbortError,
@@ -15,6 +19,7 @@ const {
1519 ERR_MISSING_ARGS ,
1620 } ,
1721} = require ( 'internal/errors' ) ;
22+ const eos = require ( 'internal/streams/end-of-stream' ) ;
1823
1924module . exports = function compose ( ...streams ) {
2025 if ( streams . length === 0 ) {
@@ -37,18 +42,32 @@ module.exports = function compose(...streams) {
3742 }
3843
3944 for ( let n = 0 ; n < streams . length ; ++ n ) {
40- if ( ! isNodeStream ( streams [ n ] ) ) {
45+ if ( ! isNodeStream ( streams [ n ] ) && ! isWebStream ( streams [ n ] ) ) {
4146 // TODO(ronag): Add checks for non streams.
4247 continue ;
4348 }
44- if ( n < streams . length - 1 && ! isReadable ( streams [ n ] ) ) {
49+ if (
50+ n < streams . length - 1 &&
51+ ! (
52+ isReadable ( streams [ n ] ) ||
53+ isReadableStream ( streams [ n ] ) ||
54+ isTransformStream ( streams [ n ] )
55+ )
56+ ) {
4557 throw new ERR_INVALID_ARG_VALUE (
4658 `streams[${ n } ]` ,
4759 orgStreams [ n ] ,
4860 'must be readable'
4961 ) ;
5062 }
51- if ( n > 0 && ! isWritable ( streams [ n ] ) ) {
63+ if (
64+ n > 0 &&
65+ ! (
66+ isWritable ( streams [ n ] ) ||
67+ isWritableStream ( streams [ n ] ) ||
68+ isTransformStream ( streams [ n ] )
69+ )
70+ ) {
5271 throw new ERR_INVALID_ARG_VALUE (
5372 `streams[${ n } ]` ,
5473 orgStreams [ n ] ,
@@ -79,8 +98,16 @@ module.exports = function compose(...streams) {
7998 const head = streams [ 0 ] ;
8099 const tail = pipeline ( streams , onfinished ) ;
81100
82- const writable = ! ! isWritable ( head ) ;
83- const readable = ! ! isReadable ( tail ) ;
101+ const writable = ! ! (
102+ isWritable ( head ) ||
103+ isWritableStream ( head ) ||
104+ isTransformStream ( head )
105+ ) ;
106+ const readable = ! ! (
107+ isReadable ( tail ) ||
108+ isReadableStream ( tail ) ||
109+ isTransformStream ( tail )
110+ ) ;
84111
85112 // TODO(ronag): Avoid double buffering.
86113 // Implement Writable/Readable/Duplex traits.
@@ -94,28 +121,55 @@ module.exports = function compose(...streams) {
94121 } ) ;
95122
96123 if ( writable ) {
97- d . _write = function ( chunk , encoding , callback ) {
98- if ( head . write ( chunk , encoding ) ) {
99- callback ( ) ;
100- } else {
101- ondrain = callback ;
102- }
103- } ;
104-
105- d . _final = function ( callback ) {
106- head . end ( ) ;
107- onfinish = callback ;
108- } ;
124+ if ( isNodeStream ( head ) ) {
125+ d . _write = function ( chunk , encoding , callback ) {
126+ if ( head . write ( chunk , encoding ) ) {
127+ callback ( ) ;
128+ } else {
129+ ondrain = callback ;
130+ }
131+ } ;
132+
133+ d . _final = function ( callback ) {
134+ head . end ( ) ;
135+ onfinish = callback ;
136+ } ;
137+
138+ head . on ( 'drain' , function ( ) {
139+ if ( ondrain ) {
140+ const cb = ondrain ;
141+ ondrain = null ;
142+ cb ( ) ;
143+ }
144+ } ) ;
145+ } else if ( isWebStream ( head ) ) {
146+ const writable = isTransformStream ( head ) ? head . writable : head ;
147+ const writer = writable . getWriter ( ) ;
148+
149+ d . _write = async function ( chunk , encoding , callback ) {
150+ try {
151+ await writer . ready ;
152+ writer . write ( chunk ) . catch ( ( ) => { } ) ;
153+ callback ( ) ;
154+ } catch ( err ) {
155+ callback ( err ) ;
156+ }
157+ } ;
158+
159+ d . _final = async function ( callback ) {
160+ try {
161+ await writer . ready ;
162+ writer . close ( ) . catch ( ( ) => { } ) ;
163+ onfinish = callback ;
164+ } catch ( err ) {
165+ callback ( err ) ;
166+ }
167+ } ;
168+ }
109169
110- head . on ( 'drain' , function ( ) {
111- if ( ondrain ) {
112- const cb = ondrain ;
113- ondrain = null ;
114- cb ( ) ;
115- }
116- } ) ;
170+ const toRead = isTransformStream ( tail ) ? tail . readable : tail ;
117171
118- tail . on ( 'finish' , function ( ) {
172+ eos ( toRead , ( ) => {
119173 if ( onfinish ) {
120174 const cb = onfinish ;
121175 onfinish = null ;
@@ -125,32 +179,54 @@ module.exports = function compose(...streams) {
125179 }
126180
127181 if ( readable ) {
128- tail . on ( 'readable' , function ( ) {
129- if ( onreadable ) {
130- const cb = onreadable ;
131- onreadable = null ;
132- cb ( ) ;
133- }
134- } ) ;
135-
136- tail . on ( 'end' , function ( ) {
137- d . push ( null ) ;
138- } ) ;
139-
140- d . _read = function ( ) {
141- while ( true ) {
142- const buf = tail . read ( ) ;
143-
144- if ( buf === null ) {
145- onreadable = d . _read ;
146- return ;
182+ if ( isNodeStream ( tail ) ) {
183+ tail . on ( 'readable' , function ( ) {
184+ if ( onreadable ) {
185+ const cb = onreadable ;
186+ onreadable = null ;
187+ cb ( ) ;
147188 }
148-
149- if ( ! d . push ( buf ) ) {
150- return ;
189+ } ) ;
190+
191+ tail . on ( 'end' , function ( ) {
192+ d . push ( null ) ;
193+ } ) ;
194+
195+ d . _read = function ( ) {
196+ while ( true ) {
197+ const buf = tail . read ( ) ;
198+ if ( buf === null ) {
199+ onreadable = d . _read ;
200+ return ;
201+ }
202+
203+ if ( ! d . push ( buf ) ) {
204+ return ;
205+ }
151206 }
152- }
153- } ;
207+ } ;
208+ } else if ( isWebStream ( tail ) ) {
209+ const readable = isTransformStream ( tail ) ? tail . readable : tail ;
210+ const reader = readable . getReader ( ) ;
211+ d . _read = async function ( ) {
212+ while ( true ) {
213+ try {
214+ const { value, done } = await reader . read ( ) ;
215+
216+ if ( ! d . push ( value ) ) {
217+ return ;
218+ }
219+
220+ if ( done ) {
221+ d . push ( null ) ;
222+ return ;
223+ }
224+ } catch {
225+ return ;
226+ }
227+ }
228+ } ;
229+ }
154230 }
155231
156232 d . _destroy = function ( err , callback ) {
@@ -166,7 +242,9 @@ module.exports = function compose(...streams) {
166242 callback ( err ) ;
167243 } else {
168244 onclose = callback ;
169- destroyer ( tail , err ) ;
245+ if ( isNodeStream ( tail ) ) {
246+ destroyer ( tail , err ) ;
247+ }
170248 }
171249 } ;
172250
0 commit comments