Skip to content

Commit 495aa7a

Browse files
committed
stream: cleanup async handling
Cleanup async stream method handling.
1 parent a5ba28d commit 495aa7a

File tree

3 files changed

+67
-106
lines changed

3 files changed

+67
-106
lines changed

lib/internal/streams/destroy.js

Lines changed: 30 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,16 @@ function destroy(err, cb) {
7070

7171
function _destroy(self, err, cb) {
7272
let called = false;
73-
const result = self._destroy(err || null, (err) => {
74-
const r = self._readableState;
75-
const w = self._writableState;
7673

74+
function onDestroy(err) {
75+
if (called) {
76+
return;
77+
}
7778
called = true;
7879

80+
const r = self._readableState;
81+
const w = self._writableState;
82+
7983
checkError(err, w, r);
8084

8185
if (w) {
@@ -94,59 +98,19 @@ function _destroy(self, err, cb) {
9498
} else {
9599
process.nextTick(emitCloseNT, self);
96100
}
97-
});
98-
if (result !== undefined && result !== null) {
101+
}
102+
const result = self._destroy(err || null, onDestroy);
103+
if (result != null) {
99104
try {
100105
const then = result.then;
101106
if (typeof then === 'function') {
102107
then.call(
103108
result,
104109
function() {
105-
if (called)
106-
return;
107-
108-
const r = self._readableState;
109-
const w = self._writableState;
110-
111-
if (w) {
112-
w.closed = true;
113-
}
114-
if (r) {
115-
r.closed = true;
116-
}
117-
118-
if (typeof cb === 'function') {
119-
process.nextTick(cb);
120-
}
121-
122-
process.nextTick(emitCloseNT, self);
110+
process.nextTick(onDestroy, null);
123111
},
124112
function(err) {
125-
const r = self._readableState;
126-
const w = self._writableState;
127-
err.stack; // eslint-disable-line no-unused-expressions
128-
129-
called = true;
130-
131-
if (w && !w.errored) {
132-
w.errored = err;
133-
}
134-
if (r && !r.errored) {
135-
r.errored = err;
136-
}
137-
138-
if (w) {
139-
w.closed = true;
140-
}
141-
if (r) {
142-
r.closed = true;
143-
}
144-
145-
if (typeof cb === 'function') {
146-
process.nextTick(cb, err);
147-
}
148-
149-
process.nextTick(emitErrorCloseNT, self, err);
113+
process.nextTick(onDestroy, err);
150114
});
151115
}
152116
} catch (err) {
@@ -285,69 +249,47 @@ function construct(stream, cb) {
285249
}
286250

287251
function constructNT(stream) {
288-
const r = stream._readableState;
289-
const w = stream._writableState;
290-
// With duplex streams we use the writable side for state.
291-
const s = w || r;
292-
293252
let called = false;
294-
const result = stream._construct((err) => {
253+
254+
function onConstruct(err) {
255+
if (called) {
256+
errorOrDestroy(stream, err ?? new ERR_MULTIPLE_CALLBACK());
257+
return;
258+
}
259+
called = true;
260+
261+
const r = stream._readableState;
262+
const w = stream._writableState;
263+
const s = w || r;
264+
295265
if (r) {
296266
r.constructed = true;
297267
}
298268
if (w) {
299269
w.constructed = true;
300270
}
301271

302-
if (called) {
303-
err = new ERR_MULTIPLE_CALLBACK();
304-
} else {
305-
called = true;
306-
}
307-
308272
if (s.destroyed) {
309273
stream.emit(kDestroy, err);
310274
} else if (err) {
311275
errorOrDestroy(stream, err, true);
312276
} else {
313277
process.nextTick(emitConstructNT, stream);
314278
}
315-
});
316-
if (result !== undefined && result !== null) {
279+
}
280+
281+
const result = stream._construct(onConstruct);
282+
if (result != null) {
317283
try {
318284
const then = result.then;
319285
if (typeof then === 'function') {
320286
then.call(
321287
result,
322288
function() {
323-
// If the callback was invoked, do nothing further.
324-
if (called)
325-
return;
326-
if (r) {
327-
r.constructed = true;
328-
}
329-
if (w) {
330-
w.constructed = true;
331-
}
332-
if (s.destroyed) {
333-
process.nextTick(() => stream.emit(kDestroy));
334-
} else {
335-
process.nextTick(emitConstructNT, stream);
336-
}
289+
process.nextTick(onConstruct, null);
337290
},
338291
function(err) {
339-
if (r) {
340-
r.constructed = true;
341-
}
342-
if (w) {
343-
w.constructed = true;
344-
}
345-
called = true;
346-
if (s.destroyed) {
347-
process.nextTick(() => stream.emit(kDestroy, err));
348-
} else {
349-
process.nextTick(errorOrDestroy, stream, err);
350-
}
292+
process.nextTick(onConstruct, err);
351293
});
352294
}
353295
} catch (err) {

lib/internal/streams/readable.js

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,8 +483,25 @@ Readable.prototype.read = function(n) {
483483
// If the length is currently zero, then we *need* a readable event.
484484
if (state.length === 0)
485485
state.needReadable = true;
486+
486487
// Call internal read method
487-
this._read(state.highWaterMark);
488+
const result = this._read(state.highWaterMark);
489+
if (result != null) {
490+
try {
491+
const then = result.then;
492+
if (typeof then === 'function') {
493+
then.call(
494+
result,
495+
nop,
496+
function(err) {
497+
errorOrDestroy(this, err);
498+
});
499+
}
500+
} catch (err) {
501+
errorOrDestroy(this, err);
502+
}
503+
}
504+
488505
state.sync = false;
489506
// If _read pushed data synchronously, then `reading` will be false,
490507
// and we need to re-evaluate how much data we can return to the user.

lib/internal/streams/writable.js

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -658,9 +658,15 @@ function needFinish(state) {
658658
}
659659

660660
function callFinal(stream, state) {
661-
state.sync = true;
662-
state.pendingcb++;
663-
const result = stream._final((err) => {
661+
let called = false;
662+
663+
function onFinish(err) {
664+
if (called) {
665+
errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK());
666+
return;
667+
}
668+
called = true;
669+
664670
state.pendingcb--;
665671
if (err) {
666672
const onfinishCallbacks = state[kOnFinished].splice(0);
@@ -677,31 +683,27 @@ function callFinal(stream, state) {
677683
state.pendingcb++;
678684
process.nextTick(finish, stream, state);
679685
}
680-
});
681-
if (result !== undefined && result !== null) {
686+
}
687+
688+
state.sync = true;
689+
state.pendingcb++;
690+
const result = stream._final(onFinish);
691+
692+
if (result != null) {
682693
try {
683694
const then = result.then;
684695
if (typeof then === 'function') {
685696
then.call(
686697
result,
687698
function() {
688-
if (state.prefinished)
689-
return;
690-
state.prefinish = true;
691-
process.nextTick(() => stream.emit('prefinish'));
692-
state.pendingcb++;
693-
process.nextTick(finish, stream, state);
699+
process.nextTick(onFinish, null);
694700
},
695701
function(err) {
696-
const onfinishCallbacks = state[kOnFinished].splice(0);
697-
for (let i = 0; i < onfinishCallbacks.length; i++) {
698-
process.nextTick(onfinishCallbacks[i], err);
699-
}
700-
process.nextTick(errorOrDestroy, stream, err, state.sync);
702+
process.nextTick(onFinish, err);
701703
});
702704
}
703705
} catch (err) {
704-
process.nextTick(errorOrDestroy, stream, err, state.sync);
706+
onFinish(stream, state, err);
705707
}
706708
}
707709
state.sync = false;

0 commit comments

Comments
 (0)