Skip to content

Commit 49327ff

Browse files
authored
Revert "feat!: support iterables in file#save (#2202) (#2203)" (#2270)
This reverts commit c0d9d58.
1 parent c0d9d58 commit 49327ff

File tree

2 files changed

+25
-262
lines changed

2 files changed

+25
-262
lines changed

src/file.ts

Lines changed: 25 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,7 @@ import * as crypto from 'crypto';
2929
import * as fs from 'fs';
3030
import * as mime from 'mime';
3131
import * as resumableUpload from './resumable-upload';
32-
import {
33-
Writable,
34-
Readable,
35-
pipeline,
36-
Transform,
37-
PassThrough,
38-
PipelineSource,
39-
} from 'stream';
32+
import {Writable, Readable, pipeline, Transform, PassThrough} from 'stream';
4033
import * as zlib from 'zlib';
4134
import * as http from 'http';
4235

@@ -100,8 +93,6 @@ export interface PolicyDocument {
10093
signature: string;
10194
}
10295

103-
export type SaveData = string | Buffer | PipelineSource<string | Buffer>;
104-
10596
export type GenerateSignedPostPolicyV2Response = [PolicyDocument];
10697

10798
export interface GenerateSignedPostPolicyV2Callback {
@@ -463,7 +454,6 @@ export enum FileExceptionMessages {
463454
UPLOAD_MISMATCH = `The uploaded data did not match the data from the server.
464455
As a precaution, the file has been deleted.
465456
To be sure the content is the same, you should try uploading the file again.`,
466-
STREAM_NOT_READABLE = 'Stream must be readable.',
467457
}
468458

469459
/**
@@ -3567,9 +3557,13 @@ class File extends ServiceObject<File, FileMetadata> {
35673557
this.copy(newFile, copyOptions, callback!);
35683558
}
35693559

3570-
save(data: SaveData, options?: SaveOptions): Promise<void>;
3571-
save(data: SaveData, callback: SaveCallback): void;
3572-
save(data: SaveData, options: SaveOptions, callback: SaveCallback): void;
3560+
save(data: string | Buffer, options?: SaveOptions): Promise<void>;
3561+
save(data: string | Buffer, callback: SaveCallback): void;
3562+
save(
3563+
data: string | Buffer,
3564+
options: SaveOptions,
3565+
callback: SaveCallback
3566+
): void;
35733567
/**
35743568
* @typedef {object} SaveOptions
35753569
* @extends CreateWriteStreamOptions
@@ -3596,7 +3590,7 @@ class File extends ServiceObject<File, FileMetadata> {
35963590
* resumable feature is disabled.
35973591
* </p>
35983592
*
3599-
* @param {SaveData} data The data to write to a file.
3593+
* @param {string | Buffer} data The data to write to a file.
36003594
* @param {SaveOptions} [options] See {@link File#createWriteStream}'s `options`
36013595
* parameter.
36023596
* @param {SaveCallback} [callback] Callback function.
@@ -3624,7 +3618,7 @@ class File extends ServiceObject<File, FileMetadata> {
36243618
* ```
36253619
*/
36263620
save(
3627-
data: SaveData,
3621+
data: string | Buffer,
36283622
optionsOrCallback?: SaveOptions | SaveCallback,
36293623
callback?: SaveCallback
36303624
): Promise<void> | void {
@@ -3644,68 +3638,28 @@ class File extends ServiceObject<File, FileMetadata> {
36443638
}
36453639
const returnValue = retry(
36463640
async (bail: (err: Error) => void) => {
3647-
if (data instanceof Readable) {
3648-
// Make sure any pending async readable operations are finished before
3649-
// attempting to check if the stream is readable.
3650-
await new Promise(resolve => setImmediate(resolve));
3651-
3652-
if (!data.readable || data.destroyed) {
3653-
// Calling pipeline() with a non-readable stream will result in the
3654-
// callback being called without an error, and no piping taking
3655-
// place. In that case, file.save() would appear to succeed, but
3656-
// nothing would be uploaded.
3657-
return bail(new Error(FileExceptionMessages.STREAM_NOT_READABLE));
3658-
}
3659-
}
3660-
3661-
return new Promise<void>((resolve, reject) => {
3641+
await new Promise<void>((resolve, reject) => {
36623642
if (maxRetries === 0) {
36633643
this.storage.retryOptions.autoRetry = false;
36643644
}
3665-
const writable = this.createWriteStream(options);
3666-
3667-
if (options.onUploadProgress) {
3668-
writable.on('progress', options.onUploadProgress);
3669-
}
3670-
3671-
const handleError = (err: Error) => {
3672-
if (
3673-
!this.storage.retryOptions.autoRetry ||
3674-
!this.storage.retryOptions.retryableErrorFn!(err)
3675-
) {
3676-
bail(err);
3677-
}
3678-
3679-
reject(err);
3680-
};
3681-
3682-
if (typeof data === 'string' || Buffer.isBuffer(data)) {
3683-
writable
3684-
.on('error', handleError)
3685-
.on('finish', () => resolve())
3686-
.end(data);
3687-
} else {
3688-
pipeline(data, writable, err => {
3689-
if (err) {
3690-
// If data is not a valid PipelineSource, then pipeline will
3691-
// fail without destroying the writable stream. If data is a
3692-
// PipelineSource that yields invalid chunks (e.g. a stream in
3693-
// object mode or an iterable that does not yield Buffers or
3694-
// strings), then pipeline will destroy the writable stream.
3695-
if (!writable.destroyed) writable.destroy();
3696-
3697-
if (typeof data !== 'function') {
3698-
// Only PipelineSourceFunction can be retried. Async-iterables
3699-
// and Readable streams can only be consumed once.
3700-
bail(err);
3701-
}
3702-
3703-
handleError(err);
3645+
const writable = this.createWriteStream(options)
3646+
.on('error', err => {
3647+
if (
3648+
this.storage.retryOptions.autoRetry &&
3649+
this.storage.retryOptions.retryableErrorFn!(err)
3650+
) {
3651+
return reject(err);
37043652
} else {
3705-
resolve();
3653+
return bail(err);
37063654
}
3655+
})
3656+
.on('finish', () => {
3657+
return resolve();
37073658
});
3659+
if (options.onUploadProgress) {
3660+
writable.on('progress', options.onUploadProgress);
37083661
}
3662+
writable.end(data);
37093663
});
37103664
},
37113665
{

test/file.ts

Lines changed: 0 additions & 191 deletions
Original file line numberDiff line numberDiff line change
@@ -4275,197 +4275,6 @@ describe('File', () => {
42754275
await file.save(DATA, options, assert.ifError);
42764276
});
42774277

4278-
it('should save a Readable with no errors', done => {
4279-
const options = {resumable: false};
4280-
file.createWriteStream = () => {
4281-
const writeStream = new PassThrough();
4282-
writeStream.on('data', data => {
4283-
assert.strictEqual(data.toString(), DATA);
4284-
});
4285-
writeStream.once('finish', done);
4286-
return writeStream;
4287-
};
4288-
4289-
const readable = new Readable({
4290-
read() {
4291-
this.push(DATA);
4292-
this.push(null);
4293-
},
4294-
});
4295-
4296-
void file.save(readable, options);
4297-
});
4298-
4299-
it('should propagate Readable errors', done => {
4300-
const options = {resumable: false};
4301-
file.createWriteStream = () => {
4302-
const writeStream = new PassThrough();
4303-
let errorCalled = false;
4304-
writeStream.on('data', data => {
4305-
assert.strictEqual(data.toString(), DATA);
4306-
});
4307-
writeStream.on('error', err => {
4308-
errorCalled = true;
4309-
assert.strictEqual(err.message, 'Error!');
4310-
});
4311-
writeStream.on('finish', () => {
4312-
assert.ok(errorCalled);
4313-
});
4314-
return writeStream;
4315-
};
4316-
4317-
const readable = new Readable({
4318-
read() {
4319-
setTimeout(() => {
4320-
this.push(DATA);
4321-
this.destroy(new Error('Error!'));
4322-
}, 50);
4323-
},
4324-
});
4325-
4326-
file.save(readable, options, (err: Error) => {
4327-
assert.strictEqual(err.message, 'Error!');
4328-
done();
4329-
});
4330-
});
4331-
4332-
it('Readable upload should not retry', async () => {
4333-
const options = {resumable: false};
4334-
4335-
let retryCount = 0;
4336-
4337-
file.createWriteStream = () => {
4338-
retryCount++;
4339-
return new Transform({
4340-
transform(
4341-
chunk: string | Buffer,
4342-
_encoding: string,
4343-
done: Function
4344-
) {
4345-
this.push(chunk);
4346-
setTimeout(() => {
4347-
done(new HTTPError('retryable error', 408));
4348-
}, 5);
4349-
},
4350-
});
4351-
};
4352-
try {
4353-
const readable = new Readable({
4354-
read() {
4355-
this.push(DATA);
4356-
this.push(null);
4357-
},
4358-
});
4359-
4360-
await file.save(readable, options);
4361-
throw Error('unreachable');
4362-
} catch (e) {
4363-
assert.strictEqual((e as Error).message, 'retryable error');
4364-
assert.ok(retryCount === 1);
4365-
}
4366-
});
4367-
4368-
it('Destroyed Readable upload should throw', async () => {
4369-
const options = {resumable: false};
4370-
4371-
file.createWriteStream = () => {
4372-
throw new Error('unreachable');
4373-
};
4374-
try {
4375-
const readable = new Readable({
4376-
read() {
4377-
this.push(DATA);
4378-
this.push(null);
4379-
},
4380-
});
4381-
4382-
readable.destroy();
4383-
4384-
await file.save(readable, options);
4385-
} catch (e) {
4386-
assert.strictEqual(
4387-
(e as Error).message,
4388-
FileExceptionMessages.STREAM_NOT_READABLE
4389-
);
4390-
}
4391-
});
4392-
4393-
it('should save a generator with no error', done => {
4394-
const options = {resumable: false};
4395-
file.createWriteStream = () => {
4396-
const writeStream = new PassThrough();
4397-
writeStream.on('data', data => {
4398-
assert.strictEqual(data.toString(), DATA);
4399-
done();
4400-
});
4401-
return writeStream;
4402-
};
4403-
4404-
const generator = async function* (arg?: {signal?: AbortSignal}) {
4405-
await new Promise(resolve => setTimeout(resolve, 5));
4406-
if (arg?.signal?.aborted) return;
4407-
yield DATA;
4408-
};
4409-
4410-
void file.save(generator, options);
4411-
});
4412-
4413-
it('should propagate async iterable errors', done => {
4414-
const options = {resumable: false};
4415-
file.createWriteStream = () => {
4416-
const writeStream = new PassThrough();
4417-
let errorCalled = false;
4418-
writeStream.on('data', data => {
4419-
assert.strictEqual(data.toString(), DATA);
4420-
});
4421-
writeStream.on('error', err => {
4422-
errorCalled = true;
4423-
assert.strictEqual(err.message, 'Error!');
4424-
});
4425-
writeStream.on('finish', () => {
4426-
assert.ok(errorCalled);
4427-
});
4428-
return writeStream;
4429-
};
4430-
4431-
const generator = async function* () {
4432-
yield DATA;
4433-
throw new Error('Error!');
4434-
};
4435-
4436-
file.save(generator(), options, (err: Error) => {
4437-
assert.strictEqual(err.message, 'Error!');
4438-
done();
4439-
});
4440-
});
4441-
4442-
it('should error on invalid async iterator data', done => {
4443-
const options = {resumable: false};
4444-
file.createWriteStream = () => {
4445-
const writeStream = new PassThrough();
4446-
let errorCalled = false;
4447-
writeStream.on('error', () => {
4448-
errorCalled = true;
4449-
});
4450-
writeStream.on('finish', () => {
4451-
assert.ok(errorCalled);
4452-
});
4453-
return writeStream;
4454-
};
4455-
4456-
const generator = async function* () {
4457-
yield {thisIsNot: 'a buffer or a string'};
4458-
};
4459-
4460-
file.save(generator(), options, (err: Error) => {
4461-
assert.strictEqual(
4462-
err.message,
4463-
'The "chunk" argument must be of type string or an instance of Buffer or Uint8Array. Received an instance of Object'
4464-
);
4465-
done();
4466-
});
4467-
});
4468-
44694278
it('buffer upload should retry on first failure', async () => {
44704279
const options = {
44714280
resumable: false,

0 commit comments

Comments
 (0)