| 
 | 1 | +'use strict';  | 
 | 2 | + | 
 | 3 | +const common = require('../common');  | 
 | 4 | +const { finished, addAbortSignal } = require('stream');  | 
 | 5 | +const { ReadableStream, WritableStream } = require('stream/web');  | 
 | 6 | +const assert = require('assert');  | 
 | 7 | + | 
 | 8 | +function createTestReadableStream() {  | 
 | 9 | +  return new ReadableStream({  | 
 | 10 | +    start(controller) {  | 
 | 11 | +      controller.enqueue('a');  | 
 | 12 | +      controller.enqueue('b');  | 
 | 13 | +      controller.enqueue('c');  | 
 | 14 | +      controller.close();  | 
 | 15 | +    }  | 
 | 16 | +  });  | 
 | 17 | +}  | 
 | 18 | + | 
 | 19 | +function createTestWritableStream(values) {  | 
 | 20 | +  return new WritableStream({  | 
 | 21 | +    write(chunk) {  | 
 | 22 | +      values.push(chunk);  | 
 | 23 | +    }  | 
 | 24 | +  });  | 
 | 25 | +}  | 
 | 26 | + | 
 | 27 | +{  | 
 | 28 | +  const rs = createTestReadableStream();  | 
 | 29 | + | 
 | 30 | +  const reader = rs.getReader();  | 
 | 31 | + | 
 | 32 | +  const ac = new AbortController();  | 
 | 33 | + | 
 | 34 | +  addAbortSignal(ac.signal, rs);  | 
 | 35 | + | 
 | 36 | +  finished(rs, common.mustCall((err) => {  | 
 | 37 | +    assert.strictEqual(err.name, 'AbortError');  | 
 | 38 | +    assert.rejects(reader.read(), /AbortError/).then(common.mustCall());  | 
 | 39 | +    assert.rejects(reader.closed, /AbortError/).then(common.mustCall());  | 
 | 40 | +  }));  | 
 | 41 | + | 
 | 42 | +  reader.read().then(common.mustCall((result) => {  | 
 | 43 | +    assert.strictEqual(result.value, 'a');  | 
 | 44 | +    ac.abort();  | 
 | 45 | +  }));  | 
 | 46 | +}  | 
 | 47 | + | 
 | 48 | +{  | 
 | 49 | +  const rs = createTestReadableStream();  | 
 | 50 | + | 
 | 51 | +  const ac = new AbortController();  | 
 | 52 | + | 
 | 53 | +  addAbortSignal(ac.signal, rs);  | 
 | 54 | + | 
 | 55 | +  assert.rejects((async () => {  | 
 | 56 | +    for await (const chunk of rs) {  | 
 | 57 | +      if (chunk === 'b') {  | 
 | 58 | +        ac.abort();  | 
 | 59 | +      }  | 
 | 60 | +    }  | 
 | 61 | +  })(), /AbortError/).then(common.mustCall());  | 
 | 62 | +}  | 
 | 63 | + | 
 | 64 | +{  | 
 | 65 | +  const rs1 = createTestReadableStream();  | 
 | 66 | + | 
 | 67 | +  const rs2 = createTestReadableStream();  | 
 | 68 | + | 
 | 69 | +  const ac = new AbortController();  | 
 | 70 | + | 
 | 71 | +  addAbortSignal(ac.signal, rs1);  | 
 | 72 | +  addAbortSignal(ac.signal, rs2);  | 
 | 73 | + | 
 | 74 | +  const reader1 = rs1.getReader();  | 
 | 75 | +  const reader2 = rs2.getReader();  | 
 | 76 | + | 
 | 77 | +  finished(rs1, common.mustCall((err) => {  | 
 | 78 | +    assert.strictEqual(err.name, 'AbortError');  | 
 | 79 | +    assert.rejects(reader1.read(), /AbortError/).then(common.mustCall());  | 
 | 80 | +    assert.rejects(reader1.closed, /AbortError/).then(common.mustCall());  | 
 | 81 | +  }));  | 
 | 82 | + | 
 | 83 | +  finished(rs2, common.mustCall((err) => {  | 
 | 84 | +    assert.strictEqual(err.name, 'AbortError');  | 
 | 85 | +    assert.rejects(reader2.read(), /AbortError/).then(common.mustCall());  | 
 | 86 | +    assert.rejects(reader2.closed, /AbortError/).then(common.mustCall());  | 
 | 87 | +  }));  | 
 | 88 | + | 
 | 89 | +  ac.abort();  | 
 | 90 | +}  | 
 | 91 | + | 
 | 92 | +{  | 
 | 93 | +  const rs = createTestReadableStream();  | 
 | 94 | + | 
 | 95 | +  const { 0: rs1, 1: rs2 } = rs.tee();  | 
 | 96 | + | 
 | 97 | +  const ac = new AbortController();  | 
 | 98 | + | 
 | 99 | +  addAbortSignal(ac.signal, rs);  | 
 | 100 | + | 
 | 101 | +  const reader1 = rs1.getReader();  | 
 | 102 | +  const reader2 = rs2.getReader();  | 
 | 103 | + | 
 | 104 | +  finished(rs1, common.mustCall((err) => {  | 
 | 105 | +    assert.strictEqual(err.name, 'AbortError');  | 
 | 106 | +    assert.rejects(reader1.read(), /AbortError/).then(common.mustCall());  | 
 | 107 | +    assert.rejects(reader1.closed, /AbortError/).then(common.mustCall());  | 
 | 108 | +  }));  | 
 | 109 | + | 
 | 110 | +  finished(rs2, common.mustCall((err) => {  | 
 | 111 | +    assert.strictEqual(err.name, 'AbortError');  | 
 | 112 | +    assert.rejects(reader2.read(), /AbortError/).then(common.mustCall());  | 
 | 113 | +    assert.rejects(reader2.closed, /AbortError/).then(common.mustCall());  | 
 | 114 | +  }));  | 
 | 115 | + | 
 | 116 | +  ac.abort();  | 
 | 117 | +}  | 
 | 118 | + | 
 | 119 | +{  | 
 | 120 | +  const values = [];  | 
 | 121 | +  const ws = createTestWritableStream(values);  | 
 | 122 | + | 
 | 123 | +  const ac = new AbortController();  | 
 | 124 | + | 
 | 125 | +  addAbortSignal(ac.signal, ws);  | 
 | 126 | + | 
 | 127 | +  const writer = ws.getWriter();  | 
 | 128 | + | 
 | 129 | +  finished(ws, common.mustCall((err) => {  | 
 | 130 | +    assert.strictEqual(err.name, 'AbortError');  | 
 | 131 | +    assert.deepStrictEqual(values, ['a']);  | 
 | 132 | +    assert.rejects(writer.write('b'), /AbortError/).then(common.mustCall());  | 
 | 133 | +    assert.rejects(writer.closed, /AbortError/).then(common.mustCall());  | 
 | 134 | +  }));  | 
 | 135 | + | 
 | 136 | +  writer.write('a').then(() => {  | 
 | 137 | +    ac.abort();  | 
 | 138 | +  });  | 
 | 139 | +}  | 
 | 140 | + | 
 | 141 | +{  | 
 | 142 | +  const values = [];  | 
 | 143 | + | 
 | 144 | +  const ws1 = createTestWritableStream(values);  | 
 | 145 | +  const ws2 = createTestWritableStream(values);  | 
 | 146 | + | 
 | 147 | +  const ac = new AbortController();  | 
 | 148 | + | 
 | 149 | +  addAbortSignal(ac.signal, ws1);  | 
 | 150 | +  addAbortSignal(ac.signal, ws2);  | 
 | 151 | + | 
 | 152 | +  const writer1 = ws1.getWriter();  | 
 | 153 | +  const writer2 = ws2.getWriter();  | 
 | 154 | + | 
 | 155 | +  finished(ws1, common.mustCall((err) => {  | 
 | 156 | +    assert.strictEqual(err.name, 'AbortError');  | 
 | 157 | +    assert.rejects(writer1.write('a'), /AbortError/).then(common.mustCall());  | 
 | 158 | +    assert.rejects(writer1.closed, /AbortError/).then(common.mustCall());  | 
 | 159 | +  }));  | 
 | 160 | + | 
 | 161 | +  finished(ws2, common.mustCall((err) => {  | 
 | 162 | +    assert.strictEqual(err.name, 'AbortError');  | 
 | 163 | +    assert.rejects(writer2.write('a'), /AbortError/).then(common.mustCall());  | 
 | 164 | +    assert.rejects(writer2.closed, /AbortError/).then(common.mustCall());  | 
 | 165 | +  }));  | 
 | 166 | + | 
 | 167 | +  ac.abort();  | 
 | 168 | +}  | 
0 commit comments