Skip to content

Commit c0d9d58

Browse files
feat!: support iterables in file#save (#2202) (#2203)
1 parent 5eae660 commit c0d9d58

File tree

2 files changed

+262
-25
lines changed

2 files changed

+262
-25
lines changed

src/file.ts

Lines changed: 71 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,14 @@ import * as crypto from 'crypto';
2929
import * as fs from 'fs';
3030
import * as mime from 'mime';
3131
import * as resumableUpload from './resumable-upload';
32-
import {Writable, Readable, pipeline, Transform, PassThrough} from 'stream';
32+
import {
33+
Writable,
34+
Readable,
35+
pipeline,
36+
Transform,
37+
PassThrough,
38+
PipelineSource,
39+
} from 'stream';
3340
import * as zlib from 'zlib';
3441
import * as http from 'http';
3542

@@ -93,6 +100,8 @@ export interface PolicyDocument {
93100
signature: string;
94101
}
95102

103+
export type SaveData = string | Buffer | PipelineSource<string | Buffer>;
104+
96105
export type GenerateSignedPostPolicyV2Response = [PolicyDocument];
97106

98107
export interface GenerateSignedPostPolicyV2Callback {
@@ -454,6 +463,7 @@ export enum FileExceptionMessages {
454463
UPLOAD_MISMATCH = `The uploaded data did not match the data from the server.
455464
As a precaution, the file has been deleted.
456465
To be sure the content is the same, you should try uploading the file again.`,
466+
STREAM_NOT_READABLE = 'Stream must be readable.',
457467
}
458468

459469
/**
@@ -3557,13 +3567,9 @@ class File extends ServiceObject<File, FileMetadata> {
35573567
this.copy(newFile, copyOptions, callback!);
35583568
}
35593569

3560-
save(data: string | Buffer, options?: SaveOptions): Promise<void>;
3561-
save(data: string | Buffer, callback: SaveCallback): void;
3562-
save(
3563-
data: string | Buffer,
3564-
options: SaveOptions,
3565-
callback: SaveCallback
3566-
): void;
3570+
save(data: SaveData, options?: SaveOptions): Promise<void>;
3571+
save(data: SaveData, callback: SaveCallback): void;
3572+
save(data: SaveData, options: SaveOptions, callback: SaveCallback): void;
35673573
/**
35683574
* @typedef {object} SaveOptions
35693575
* @extends CreateWriteStreamOptions
@@ -3590,7 +3596,7 @@ class File extends ServiceObject<File, FileMetadata> {
35903596
* resumable feature is disabled.
35913597
* </p>
35923598
*
3593-
* @param {string | Buffer} data The data to write to a file.
3599+
* @param {SaveData} data The data to write to a file.
35943600
* @param {SaveOptions} [options] See {@link File#createWriteStream}'s `options`
35953601
* parameter.
35963602
* @param {SaveCallback} [callback] Callback function.
@@ -3618,7 +3624,7 @@ class File extends ServiceObject<File, FileMetadata> {
36183624
* ```
36193625
*/
36203626
save(
3621-
data: string | Buffer,
3627+
data: SaveData,
36223628
optionsOrCallback?: SaveOptions | SaveCallback,
36233629
callback?: SaveCallback
36243630
): Promise<void> | void {
@@ -3638,28 +3644,68 @@ class File extends ServiceObject<File, FileMetadata> {
36383644
}
36393645
const returnValue = retry(
36403646
async (bail: (err: Error) => void) => {
3641-
await new Promise<void>((resolve, reject) => {
3647+
if (data instanceof Readable) {
3648+
// Make sure any pending async readable operations are finished before
3649+
// attempting to check if the stream is readable.
3650+
await new Promise(resolve => setImmediate(resolve));
3651+
3652+
if (!data.readable || data.destroyed) {
3653+
// Calling pipeline() with a non-readable stream will result in the
3654+
// callback being called without an error, and no piping taking
3655+
// place. In that case, file.save() would appear to succeed, but
3656+
// nothing would be uploaded.
3657+
return bail(new Error(FileExceptionMessages.STREAM_NOT_READABLE));
3658+
}
3659+
}
3660+
3661+
return new Promise<void>((resolve, reject) => {
36423662
if (maxRetries === 0) {
36433663
this.storage.retryOptions.autoRetry = false;
36443664
}
3645-
const writable = this.createWriteStream(options)
3646-
.on('error', err => {
3647-
if (
3648-
this.storage.retryOptions.autoRetry &&
3649-
this.storage.retryOptions.retryableErrorFn!(err)
3650-
) {
3651-
return reject(err);
3665+
const writable = this.createWriteStream(options);
3666+
3667+
if (options.onUploadProgress) {
3668+
writable.on('progress', options.onUploadProgress);
3669+
}
3670+
3671+
const handleError = (err: Error) => {
3672+
if (
3673+
!this.storage.retryOptions.autoRetry ||
3674+
!this.storage.retryOptions.retryableErrorFn!(err)
3675+
) {
3676+
bail(err);
3677+
}
3678+
3679+
reject(err);
3680+
};
3681+
3682+
if (typeof data === 'string' || Buffer.isBuffer(data)) {
3683+
writable
3684+
.on('error', handleError)
3685+
.on('finish', () => resolve())
3686+
.end(data);
3687+
} else {
3688+
pipeline(data, writable, err => {
3689+
if (err) {
3690+
// If data is not a valid PipelineSource, then pipeline will
3691+
// fail without destroying the writable stream. If data is a
3692+
// PipelineSource that yields invalid chunks (e.g. a stream in
3693+
// object mode or an iterable that does not yield Buffers or
3694+
// strings), then pipeline will destroy the writable stream.
3695+
if (!writable.destroyed) writable.destroy();
3696+
3697+
if (typeof data !== 'function') {
3698+
// Only PipelineSourceFunction can be retried. Async-iterables
3699+
// and Readable streams can only be consumed once.
3700+
bail(err);
3701+
}
3702+
3703+
handleError(err);
36523704
} else {
3653-
return bail(err);
3705+
resolve();
36543706
}
3655-
})
3656-
.on('finish', () => {
3657-
return resolve();
36583707
});
3659-
if (options.onUploadProgress) {
3660-
writable.on('progress', options.onUploadProgress);
36613708
}
3662-
writable.end(data);
36633709
});
36643710
},
36653711
{

test/file.ts

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4275,6 +4275,197 @@ describe('File', () => {
42754275
await file.save(DATA, options, assert.ifError);
42764276
});
42774277

4278+
it('should save a Readable with no errors', done => {
4279+
const options = {resumable: false};
4280+
file.createWriteStream = () => {
4281+
const writeStream = new PassThrough();
4282+
writeStream.on('data', data => {
4283+
assert.strictEqual(data.toString(), DATA);
4284+
});
4285+
writeStream.once('finish', done);
4286+
return writeStream;
4287+
};
4288+
4289+
const readable = new Readable({
4290+
read() {
4291+
this.push(DATA);
4292+
this.push(null);
4293+
},
4294+
});
4295+
4296+
void file.save(readable, options);
4297+
});
4298+
4299+
it('should propagate Readable errors', done => {
4300+
const options = {resumable: false};
4301+
file.createWriteStream = () => {
4302+
const writeStream = new PassThrough();
4303+
let errorCalled = false;
4304+
writeStream.on('data', data => {
4305+
assert.strictEqual(data.toString(), DATA);
4306+
});
4307+
writeStream.on('error', err => {
4308+
errorCalled = true;
4309+
assert.strictEqual(err.message, 'Error!');
4310+
});
4311+
writeStream.on('finish', () => {
4312+
assert.ok(errorCalled);
4313+
});
4314+
return writeStream;
4315+
};
4316+
4317+
const readable = new Readable({
4318+
read() {
4319+
setTimeout(() => {
4320+
this.push(DATA);
4321+
this.destroy(new Error('Error!'));
4322+
}, 50);
4323+
},
4324+
});
4325+
4326+
file.save(readable, options, (err: Error) => {
4327+
assert.strictEqual(err.message, 'Error!');
4328+
done();
4329+
});
4330+
});
4331+
4332+
it('Readable upload should not retry', async () => {
4333+
const options = {resumable: false};
4334+
4335+
let retryCount = 0;
4336+
4337+
file.createWriteStream = () => {
4338+
retryCount++;
4339+
return new Transform({
4340+
transform(
4341+
chunk: string | Buffer,
4342+
_encoding: string,
4343+
done: Function
4344+
) {
4345+
this.push(chunk);
4346+
setTimeout(() => {
4347+
done(new HTTPError('retryable error', 408));
4348+
}, 5);
4349+
},
4350+
});
4351+
};
4352+
try {
4353+
const readable = new Readable({
4354+
read() {
4355+
this.push(DATA);
4356+
this.push(null);
4357+
},
4358+
});
4359+
4360+
await file.save(readable, options);
4361+
throw Error('unreachable');
4362+
} catch (e) {
4363+
assert.strictEqual((e as Error).message, 'retryable error');
4364+
assert.ok(retryCount === 1);
4365+
}
4366+
});
4367+
4368+
it('Destroyed Readable upload should throw', async () => {
4369+
const options = {resumable: false};
4370+
4371+
file.createWriteStream = () => {
4372+
throw new Error('unreachable');
4373+
};
4374+
try {
4375+
const readable = new Readable({
4376+
read() {
4377+
this.push(DATA);
4378+
this.push(null);
4379+
},
4380+
});
4381+
4382+
readable.destroy();
4383+
4384+
await file.save(readable, options);
4385+
} catch (e) {
4386+
assert.strictEqual(
4387+
(e as Error).message,
4388+
FileExceptionMessages.STREAM_NOT_READABLE
4389+
);
4390+
}
4391+
});
4392+
4393+
it('should save a generator with no error', done => {
4394+
const options = {resumable: false};
4395+
file.createWriteStream = () => {
4396+
const writeStream = new PassThrough();
4397+
writeStream.on('data', data => {
4398+
assert.strictEqual(data.toString(), DATA);
4399+
done();
4400+
});
4401+
return writeStream;
4402+
};
4403+
4404+
const generator = async function* (arg?: {signal?: AbortSignal}) {
4405+
await new Promise(resolve => setTimeout(resolve, 5));
4406+
if (arg?.signal?.aborted) return;
4407+
yield DATA;
4408+
};
4409+
4410+
void file.save(generator, options);
4411+
});
4412+
4413+
it('should propagate async iterable errors', done => {
4414+
const options = {resumable: false};
4415+
file.createWriteStream = () => {
4416+
const writeStream = new PassThrough();
4417+
let errorCalled = false;
4418+
writeStream.on('data', data => {
4419+
assert.strictEqual(data.toString(), DATA);
4420+
});
4421+
writeStream.on('error', err => {
4422+
errorCalled = true;
4423+
assert.strictEqual(err.message, 'Error!');
4424+
});
4425+
writeStream.on('finish', () => {
4426+
assert.ok(errorCalled);
4427+
});
4428+
return writeStream;
4429+
};
4430+
4431+
const generator = async function* () {
4432+
yield DATA;
4433+
throw new Error('Error!');
4434+
};
4435+
4436+
file.save(generator(), options, (err: Error) => {
4437+
assert.strictEqual(err.message, 'Error!');
4438+
done();
4439+
});
4440+
});
4441+
4442+
it('should error on invalid async iterator data', done => {
4443+
const options = {resumable: false};
4444+
file.createWriteStream = () => {
4445+
const writeStream = new PassThrough();
4446+
let errorCalled = false;
4447+
writeStream.on('error', () => {
4448+
errorCalled = true;
4449+
});
4450+
writeStream.on('finish', () => {
4451+
assert.ok(errorCalled);
4452+
});
4453+
return writeStream;
4454+
};
4455+
4456+
const generator = async function* () {
4457+
yield {thisIsNot: 'a buffer or a string'};
4458+
};
4459+
4460+
file.save(generator(), options, (err: Error) => {
4461+
assert.strictEqual(
4462+
err.message,
4463+
'The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object'
4464+
);
4465+
done();
4466+
});
4467+
});
4468+
42784469
it('buffer upload should retry on first failure', async () => {
42794470
const options = {
42804471
resumable: false,

0 commit comments

Comments
 (0)