5
5
*/
6
6
7
7
const EventEmitter = require ( 'events' ) . EventEmitter ;
8
+ const MongooseError = require ( '../error/mongooseError' ) ;
8
9
9
10
/*!
10
11
* ignore
@@ -25,6 +26,7 @@ class ChangeStream extends EventEmitter {
25
26
this . bindedEvents = false ;
26
27
this . pipeline = pipeline ;
27
28
this . options = options ;
29
+ this . errored = false ;
28
30
29
31
if ( options && options . hydrate && ! options . model ) {
30
32
throw new Error (
@@ -33,19 +35,36 @@ class ChangeStream extends EventEmitter {
33
35
) ;
34
36
}
35
37
38
+ let syncError = null ;
36
39
this . $driverChangeStreamPromise = new Promise ( ( resolve , reject ) => {
37
40
// This wrapper is necessary because of buffering.
38
- changeStreamThunk ( ( err , driverChangeStream ) => {
39
- if ( err != null ) {
40
- this . emit ( 'error' , err ) ;
41
- return reject ( err ) ;
42
- }
41
+ try {
42
+ changeStreamThunk ( ( err , driverChangeStream ) => {
43
+ if ( err != null ) {
44
+ this . errored = true ;
45
+ this . emit ( 'error' , err ) ;
46
+ return reject ( err ) ;
47
+ }
43
48
44
- this . driverChangeStream = driverChangeStream ;
45
- this . emit ( 'ready' ) ;
46
- resolve ( ) ;
47
- } ) ;
49
+ this . driverChangeStream = driverChangeStream ;
50
+ this . emit ( 'ready' ) ;
51
+ resolve ( ) ;
52
+ } ) ;
53
+ } catch ( err ) {
54
+ syncError = err ;
55
+ this . errored = true ;
56
+ this . emit ( 'error' , err ) ;
57
+ reject ( err ) ;
58
+ }
48
59
} ) ;
60
+
61
+ // Because a ChangeStream is an event emitter, there's no way to register an 'error' handler
62
+ // that catches errors which occur in the constructor, unless we force sync errors into async
63
+ // errors with setImmediate(). For cleaner stack trace, we just immediately throw any synchronous
64
+ // errors that occurred with changeStreamThunk().
65
+ if ( syncError != null ) {
66
+ throw syncError ;
67
+ }
49
68
}
50
69
51
70
_bindEvents ( ) {
@@ -92,10 +111,16 @@ class ChangeStream extends EventEmitter {
92
111
}
93
112
94
113
hasNext ( cb ) {
114
+ if ( this . errored ) {
115
+ throw new MongooseError ( 'Cannot call hasNext() on errored ChangeStream' ) ;
116
+ }
95
117
return this . driverChangeStream . hasNext ( cb ) ;
96
118
}
97
119
98
120
next ( cb ) {
121
+ if ( this . errored ) {
122
+ throw new MongooseError ( 'Cannot call next() on errored ChangeStream' ) ;
123
+ }
99
124
if ( this . options && this . options . hydrate ) {
100
125
if ( cb != null ) {
101
126
const originalCb = cb ;
@@ -126,16 +151,25 @@ class ChangeStream extends EventEmitter {
126
151
}
127
152
128
153
addListener ( event , handler ) {
154
+ if ( this . errored ) {
155
+ throw new MongooseError ( 'Cannot call addListener() on errored ChangeStream' ) ;
156
+ }
129
157
this . _bindEvents ( ) ;
130
158
return super . addListener ( event , handler ) ;
131
159
}
132
160
133
161
on ( event , handler ) {
162
+ if ( this . errored ) {
163
+ throw new MongooseError ( 'Cannot call on() on errored ChangeStream' ) ;
164
+ }
134
165
this . _bindEvents ( ) ;
135
166
return super . on ( event , handler ) ;
136
167
}
137
168
138
169
once ( event , handler ) {
170
+ if ( this . errored ) {
171
+ throw new MongooseError ( 'Cannot call once() on errored ChangeStream' ) ;
172
+ }
139
173
this . _bindEvents ( ) ;
140
174
return super . once ( event , handler ) ;
141
175
}
0 commit comments