@@ -35,6 +35,9 @@ const {
3535 isReadable,
3636 isReadableNodeStream,
3737 isNodeStream,
38+ isTransformStream,
39+ isWebStream,
40+ isReadableStream,
3841} = require ( 'internal/streams/utils' ) ;
3942const { AbortController } = require ( 'internal/abort_controller' ) ;
4043
@@ -88,7 +91,7 @@ async function* fromReadable(val) {
8891 yield * Readable . prototype [ SymbolAsyncIterator ] . call ( val ) ;
8992}
9093
91- async function pump ( iterable , writable , finish , { end } ) {
94+ async function pumpToNode ( iterable , writable , finish , { end } ) {
9295 let error ;
9396 let onresolve = null ;
9497
@@ -147,6 +150,35 @@ async function pump(iterable, writable, finish, { end }) {
147150 }
148151}
149152
153+ async function pumpToWeb ( readable , writable , finish , { end } ) {
154+ if ( isTransformStream ( writable ) ) {
155+ writable = writable . writable ;
156+ }
157+ // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure
158+ const writer = writable . getWriter ( ) ;
159+ try {
160+ for await ( const chunk of readable ) {
161+ await writer . ready ;
162+ writer . write ( chunk ) . catch ( ( ) => { } ) ;
163+ }
164+
165+ await writer . ready ;
166+
167+ if ( end ) {
168+ await writer . close ( ) ;
169+ }
170+
171+ finish ( ) ;
172+ } catch ( err ) {
173+ try {
174+ await writer . abort ( err ) ;
175+ finish ( err ) ;
176+ } catch ( err ) {
177+ finish ( err ) ;
178+ }
179+ }
180+ }
181+
150182function pipeline ( ...streams ) {
151183 return pipelineImpl ( streams , once ( popCallback ( streams ) ) ) ;
152184}
@@ -259,7 +291,11 @@ function pipelineImpl(streams, callback, opts) {
259291 ret = Duplex . from ( stream ) ;
260292 }
261293 } else if ( typeof stream === 'function' ) {
262- ret = makeAsyncIterable ( ret ) ;
294+ if ( isTransformStream ( ret ) ) {
295+ ret = makeAsyncIterable ( ret ?. readable ) ;
296+ } else {
297+ ret = makeAsyncIterable ( ret ) ;
298+ }
263299 ret = stream ( ret , { signal } ) ;
264300
265301 if ( reading ) {
@@ -303,7 +339,11 @@ function pipelineImpl(streams, callback, opts) {
303339 ) ;
304340 } else if ( isIterable ( ret , true ) ) {
305341 finishCount ++ ;
306- pump ( ret , pt , finish , { end } ) ;
342+ pumpToNode ( ret , pt , finish , { end } ) ;
343+ } else if ( isReadableStream ( ret ) || isTransformStream ( ret ) ) {
344+ const toRead = ret . readable || ret ;
345+ finishCount ++ ;
346+ pumpToNode ( toRead , pt , finish , { end } ) ;
307347 } else {
308348 throw new ERR_INVALID_RETURN_VALUE (
309349 'AsyncIterable or Promise' , 'destination' , ret ) ;
@@ -324,12 +364,30 @@ function pipelineImpl(streams, callback, opts) {
324364 if ( isReadable ( stream ) && isLastStream ) {
325365 lastStreamCleanup . push ( cleanup ) ;
326366 }
367+ } else if ( isTransformStream ( ret ) || isReadableStream ( ret ) ) {
368+ const toRead = ret . readable || ret ;
369+ finishCount ++ ;
370+ pumpToNode ( toRead , stream , finish , { end } ) ;
327371 } else if ( isIterable ( ret ) ) {
328372 finishCount ++ ;
329- pump ( ret , stream , finish , { end } ) ;
373+ pumpToNode ( ret , stream , finish , { end } ) ;
374+ } else {
375+ throw new ERR_INVALID_ARG_TYPE (
376+ 'val' , [ 'Readable' , 'Iterable' , 'AsyncIterable' , 'ReadableStream' , 'TransformStream' ] , ret ) ;
377+ }
378+ ret = stream ;
379+ } else if ( isWebStream ( stream ) ) {
380+ if ( isReadableNodeStream ( ret ) ) {
381+ finishCount ++ ;
382+ pumpToWeb ( makeAsyncIterable ( ret ) , stream , finish , { end } ) ;
383+ } else if ( isReadableStream ( ret ) || isIterable ( ret ) ) {
384+ finishCount ++ ;
385+ pumpToWeb ( ret , stream , finish , { end } ) ;
386+ } else if ( isTransformStream ( ret ) ) {
387+ pumpToWeb ( ret . readable , stream , finish , { end } ) ;
330388 } else {
331389 throw new ERR_INVALID_ARG_TYPE (
332- 'val' , [ 'Readable' , 'Iterable' , 'AsyncIterable' ] , ret ) ;
390+ 'val' , [ 'Readable' , 'Iterable' , 'AsyncIterable' , 'ReadableStream' , 'TransformStream' ] , ret ) ;
333391 }
334392 ret = stream ;
335393 } else {
0 commit comments