• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const {
5  Stream,
6  Writable,
7  Readable,
8  Transform,
9  pipeline,
10  PassThrough
11} = require('stream');
12const assert = require('assert');
13const http = require('http');
14const { promisify } = require('util');
15
16{
17  let finished = false;
18  const processed = [];
19  const expected = [
20    Buffer.from('a'),
21    Buffer.from('b'),
22    Buffer.from('c')
23  ];
24
25  const read = new Readable({
26    read() {}
27  });
28
29  const write = new Writable({
30    write(data, enc, cb) {
31      processed.push(data);
32      cb();
33    }
34  });
35
36  write.on('finish', () => {
37    finished = true;
38  });
39
40  for (let i = 0; i < expected.length; i++) {
41    read.push(expected[i]);
42  }
43  read.push(null);
44
45  pipeline(read, write, common.mustCall((err) => {
46    assert.ok(!err, 'no error');
47    assert.ok(finished);
48    assert.deepStrictEqual(processed, expected);
49  }));
50}
51
52{
53  const read = new Readable({
54    read() {}
55  });
56
57  assert.throws(() => {
58    pipeline(read, () => {});
59  }, /ERR_MISSING_ARGS/);
60  assert.throws(() => {
61    pipeline(() => {});
62  }, /ERR_MISSING_ARGS/);
63  assert.throws(() => {
64    pipeline();
65  }, /ERR_INVALID_CALLBACK/);
66}
67
68{
69  const read = new Readable({
70    read() {}
71  });
72
73  const write = new Writable({
74    write(data, enc, cb) {
75      cb();
76    }
77  });
78
79  read.push('data');
80  setImmediate(() => read.destroy());
81
82  pipeline(read, write, common.mustCall((err) => {
83    assert.ok(err, 'should have an error');
84  }));
85}
86
87{
88  const read = new Readable({
89    read() {}
90  });
91
92  const write = new Writable({
93    write(data, enc, cb) {
94      cb();
95    }
96  });
97
98  read.push('data');
99  setImmediate(() => read.destroy(new Error('kaboom')));
100
101  const dst = pipeline(read, write, common.mustCall((err) => {
102    assert.deepStrictEqual(err, new Error('kaboom'));
103  }));
104
105  assert.strictEqual(dst, write);
106}
107
108{
109  const read = new Readable({
110    read() {}
111  });
112
113  const transform = new Transform({
114    transform(data, enc, cb) {
115      cb(new Error('kaboom'));
116    }
117  });
118
119  const write = new Writable({
120    write(data, enc, cb) {
121      cb();
122    }
123  });
124
125  read.on('close', common.mustCall());
126  transform.on('close', common.mustCall());
127  write.on('close', common.mustCall());
128
129  [read, transform, write].forEach((stream) => {
130    stream.on('error', common.mustCall((err) => {
131      assert.deepStrictEqual(err, new Error('kaboom'));
132    }));
133  });
134
135  const dst = pipeline(read, transform, write, common.mustCall((err) => {
136    assert.deepStrictEqual(err, new Error('kaboom'));
137  }));
138
139  assert.strictEqual(dst, write);
140
141  read.push('hello');
142}
143
144{
145  const server = http.createServer((req, res) => {
146    const rs = new Readable({
147      read() {
148        rs.push('hello');
149        rs.push(null);
150      }
151    });
152
153    pipeline(rs, res, () => {});
154  });
155
156  server.listen(0, () => {
157    const req = http.request({
158      port: server.address().port
159    });
160
161    req.end();
162    req.on('response', (res) => {
163      const buf = [];
164      res.on('data', (data) => buf.push(data));
165      res.on('end', common.mustCall(() => {
166        assert.deepStrictEqual(
167          Buffer.concat(buf),
168          Buffer.from('hello')
169        );
170        server.close();
171      }));
172    });
173  });
174}
175
176{
177  const server = http.createServer((req, res) => {
178    let sent = false;
179    const rs = new Readable({
180      read() {
181        if (sent) {
182          return;
183        }
184        sent = true;
185        rs.push('hello');
186      },
187      destroy: common.mustCall((err, cb) => {
188        // Prevents fd leaks by destroying http pipelines
189        cb();
190      })
191    });
192
193    pipeline(rs, res, () => {});
194  });
195
196  server.listen(0, () => {
197    const req = http.request({
198      port: server.address().port
199    });
200
201    req.end();
202    req.on('response', (res) => {
203      setImmediate(() => {
204        res.destroy();
205        server.close();
206      });
207    });
208  });
209}
210
211{
212  const server = http.createServer((req, res) => {
213    let sent = 0;
214    const rs = new Readable({
215      read() {
216        if (sent++ > 10) {
217          return;
218        }
219        rs.push('hello');
220      },
221      destroy: common.mustCall((err, cb) => {
222        cb();
223      })
224    });
225
226    pipeline(rs, res, () => {});
227  });
228
229  let cnt = 10;
230
231  const badSink = new Writable({
232    write(data, enc, cb) {
233      cnt--;
234      if (cnt === 0) cb(new Error('kaboom'));
235      else cb();
236    }
237  });
238
239  server.listen(0, () => {
240    const req = http.request({
241      port: server.address().port
242    });
243
244    req.end();
245    req.on('response', (res) => {
246      pipeline(res, badSink, common.mustCall((err) => {
247        assert.deepStrictEqual(err, new Error('kaboom'));
248        server.close();
249      }));
250    });
251  });
252}
253
254{
255  const server = http.createServer((req, res) => {
256    pipeline(req, res, common.mustCall());
257  });
258
259  server.listen(0, () => {
260    const req = http.request({
261      port: server.address().port
262    });
263
264    let sent = 0;
265    const rs = new Readable({
266      read() {
267        if (sent++ > 10) {
268          return;
269        }
270        rs.push('hello');
271      }
272    });
273
274    pipeline(rs, req, common.mustCall(() => {
275      server.close();
276    }));
277
278    req.on('response', (res) => {
279      let cnt = 10;
280      res.on('data', () => {
281        cnt--;
282        if (cnt === 0) rs.destroy();
283      });
284    });
285  });
286}
287
288{
289  const makeTransform = () => {
290    const tr = new Transform({
291      transform(data, enc, cb) {
292        cb(null, data);
293      }
294    });
295
296    tr.on('close', common.mustCall());
297    return tr;
298  };
299
300  const rs = new Readable({
301    read() {
302      rs.push('hello');
303    }
304  });
305
306  let cnt = 10;
307
308  const ws = new Writable({
309    write(data, enc, cb) {
310      cnt--;
311      if (cnt === 0) return cb(new Error('kaboom'));
312      cb();
313    }
314  });
315
316  rs.on('close', common.mustCall());
317  ws.on('close', common.mustCall());
318
319  pipeline(
320    rs,
321    makeTransform(),
322    makeTransform(),
323    makeTransform(),
324    makeTransform(),
325    makeTransform(),
326    makeTransform(),
327    ws,
328    common.mustCall((err) => {
329      assert.deepStrictEqual(err, new Error('kaboom'));
330    })
331  );
332}
333
334{
335  const oldStream = new Stream();
336
337  oldStream.pause = oldStream.resume = () => {};
338  oldStream.write = (data) => {
339    oldStream.emit('data', data);
340    return true;
341  };
342  oldStream.end = () => {
343    oldStream.emit('end');
344  };
345
346  const expected = [
347    Buffer.from('hello'),
348    Buffer.from('world')
349  ];
350
351  const rs = new Readable({
352    read() {
353      for (let i = 0; i < expected.length; i++) {
354        rs.push(expected[i]);
355      }
356      rs.push(null);
357    }
358  });
359
360  const ws = new Writable({
361    write(data, enc, cb) {
362      assert.deepStrictEqual(data, expected.shift());
363      cb();
364    }
365  });
366
367  let finished = false;
368
369  ws.on('finish', () => {
370    finished = true;
371  });
372
373  pipeline(
374    rs,
375    oldStream,
376    ws,
377    common.mustCall((err) => {
378      assert(!err, 'no error');
379      assert(finished, 'last stream finished');
380    })
381  );
382}
383
384{
385  const oldStream = new Stream();
386
387  oldStream.pause = oldStream.resume = () => {};
388  oldStream.write = (data) => {
389    oldStream.emit('data', data);
390    return true;
391  };
392  oldStream.end = () => {
393    oldStream.emit('end');
394  };
395
396  const destroyableOldStream = new Stream();
397
398  destroyableOldStream.pause = destroyableOldStream.resume = () => {};
399  destroyableOldStream.destroy = common.mustCall(() => {
400    destroyableOldStream.emit('close');
401  });
402  destroyableOldStream.write = (data) => {
403    destroyableOldStream.emit('data', data);
404    return true;
405  };
406  destroyableOldStream.end = () => {
407    destroyableOldStream.emit('end');
408  };
409
410  const rs = new Readable({
411    read() {
412      rs.destroy(new Error('stop'));
413    }
414  });
415
416  const ws = new Writable({
417    write(data, enc, cb) {
418      cb();
419    }
420  });
421
422  let finished = false;
423
424  ws.on('finish', () => {
425    finished = true;
426  });
427
428  pipeline(
429    rs,
430    oldStream,
431    destroyableOldStream,
432    ws,
433    common.mustCall((err) => {
434      assert.deepStrictEqual(err, new Error('stop'));
435      assert(!finished, 'should not finish');
436    })
437  );
438}
439
440{
441  const pipelinePromise = promisify(pipeline);
442
443  async function run() {
444    const read = new Readable({
445      read() {}
446    });
447
448    const write = new Writable({
449      write(data, enc, cb) {
450        cb();
451      }
452    });
453
454    read.push('data');
455    read.push(null);
456
457    let finished = false;
458
459    write.on('finish', () => {
460      finished = true;
461    });
462
463    await pipelinePromise(read, write);
464
465    assert(finished);
466  }
467
468  run();
469}
470
471{
472  const read = new Readable({
473    read() {}
474  });
475
476  const transform = new Transform({
477    transform(data, enc, cb) {
478      cb(new Error('kaboom'));
479    }
480  });
481
482  const write = new Writable({
483    write(data, enc, cb) {
484      cb();
485    }
486  });
487
488  assert.throws(
489    () => pipeline(read, transform, write),
490    { code: 'ERR_INVALID_CALLBACK' }
491  );
492}
493
494{
495  const server = http.Server(function(req, res) {
496    res.write('asd');
497  });
498  server.listen(0, function() {
499    http.get({ port: this.address().port }, (res) => {
500      const stream = new PassThrough();
501
502      // NOTE: 2 because Node 12 streams can emit 'error'
503      // multiple times.
504      stream.on('error', common.mustCall(2));
505
506      pipeline(
507        res,
508        stream,
509        common.mustCall((err) => {
510          assert.ok(err);
511          // TODO(ronag):
512          // assert.strictEqual(err.message, 'oh no');
513          server.close();
514        })
515      );
516
517      stream.destroy(new Error('oh no'));
518    }).on('error', common.mustNotCall());
519  });
520}
521
522{
523  const r = new Readable({
524    read() {}
525  });
526  r.push('hello');
527  r.push('world');
528  r.push(null);
529  let res = '';
530  const w = new Writable({
531    write(chunk, encoding, callback) {
532      res += chunk;
533      callback();
534    }
535  });
536  pipeline([r, w], common.mustCall((err) => {
537    assert.ok(!err);
538    assert.strictEqual(res, 'helloworld');
539  }));
540}
541