Skip to content

Commit a43b490

Browse files
fix: propagate errors when using pipelines (#2560) (#2624)
* fix: propagate errors when using pipelines (#2560) * fi test case * fix test case * fix(test): destroy uploadStream after error for node 14 close event --------- Co-authored-by: Thiyagu K <[email protected]>
1 parent bcf58f0 commit a43b490

File tree

2 files changed

+76
-1
lines changed

2 files changed

+76
-1
lines changed

src/file.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2100,6 +2100,10 @@ class File extends ServiceObject<File, FileMetadata> {
21002100

21012101
const emitStream = new PassThroughShim();
21022102

2103+
// If `writeStream` is destroyed before the `writing` event, `emitStream` will not have any listeners. This prevents an unhandled error.
2104+
const noop = () => {};
2105+
emitStream.on('error', noop);
2106+
21032107
let hashCalculatingStream: HashStreamValidator | null = null;
21042108

21052109
if (crc32c || md5) {
@@ -2138,6 +2142,9 @@ class File extends ServiceObject<File, FileMetadata> {
21382142
this.startResumableUpload_(fileWriteStream, options);
21392143
}
21402144

2145+
// remove temporary noop listener as we now create a pipeline that handles the errors
2146+
emitStream.removeListener('error', noop);
2147+
21412148
pipeline(
21422149
emitStream,
21432150
...(transformStreams as [Transform]),

test/file.ts

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,14 @@ import {
2323
} from '../src/nodejs-common/index.js';
2424
import {describe, it, before, beforeEach, afterEach} from 'mocha';
2525
import {PromisifyAllOptions} from '@google-cloud/promisify';
26-
import {Readable, PassThrough, Stream, Duplex, Transform} from 'stream';
26+
import {
27+
Readable,
28+
PassThrough,
29+
Stream,
30+
Duplex,
31+
Transform,
32+
pipeline,
33+
} from 'stream';
2734
import assert from 'assert';
2835
import * as crypto from 'crypto';
2936
import duplexify from 'duplexify';
@@ -2281,6 +2288,67 @@ describe('File', () => {
22812288
writable.end('data');
22822289
});
22832290

2291+
it('should close upstream when pipeline fails', done => {
2292+
const writable: Stream.Writable = file.createWriteStream();
2293+
const error = new Error('My error');
2294+
const uploadStream = new PassThrough();
2295+
2296+
let receivedBytes = 0;
2297+
const validateStream = new PassThrough();
2298+
validateStream.on('data', (chunk: Buffer) => {
2299+
receivedBytes += chunk.length;
2300+
if (receivedBytes > 5) {
2301+
// this aborts the pipeline which should also close the internal pipeline within createWriteStream
2302+
pLine.destroy(error);
2303+
}
2304+
});
2305+
2306+
file.startResumableUpload_ = (dup: duplexify.Duplexify) => {
2307+
dup.setWritable(uploadStream);
2308+
// Emit an error so the pipeline's error-handling logic is triggered
2309+
uploadStream.emit('error', error);
2310+
// Explicitly destroy the stream so that the 'close' event is guaranteed to fire,
2311+
// even in Node v14 where autoDestroy defaults may prevent automatic closing
2312+
uploadStream.destroy();
2313+
};
2314+
2315+
let closed = false;
2316+
uploadStream.on('close', () => {
2317+
closed = true;
2318+
});
2319+
2320+
const pLine = pipeline(
2321+
(function* () {
2322+
yield 'foo'; // write some data
2323+
yield 'foo'; // write some data
2324+
yield 'foo'; // write some data
2325+
})(),
2326+
validateStream,
2327+
writable,
2328+
(e: Error | null) => {
2329+
assert.strictEqual(e, error);
2330+
assert.strictEqual(closed, true);
2331+
done();
2332+
}
2333+
);
2334+
});
2335+
2336+
it('should error pipeline if source stream emits error before any data', done => {
2337+
const writable = file.createWriteStream();
2338+
const error = new Error('Error before first chunk');
2339+
pipeline(
2340+
// eslint-disable-next-line require-yield
2341+
(function* () {
2342+
throw error;
2343+
})(),
2344+
writable,
2345+
(e: Error | null) => {
2346+
assert.strictEqual(e, error);
2347+
done();
2348+
}
2349+
);
2350+
});
2351+
22842352
describe('validation', () => {
22852353
const data = 'test';
22862354

0 commit comments

Comments
 (0)