• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ObjectSetPrototypeOf,
5} = primordials;
6
7const EE = require('events');
8
9function Stream(opts) {
10  EE.call(this, opts);
11}
12ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
13ObjectSetPrototypeOf(Stream, EE);
14
15Stream.prototype.pipe = function(dest, options) {
16  const source = this;
17
18  function ondata(chunk) {
19    if (dest.writable && dest.write(chunk) === false && source.pause) {
20      source.pause();
21    }
22  }
23
24  source.on('data', ondata);
25
26  function ondrain() {
27    if (source.readable && source.resume) {
28      source.resume();
29    }
30  }
31
32  dest.on('drain', ondrain);
33
34  // If the 'end' option is not supplied, dest.end() will be called when
35  // source gets the 'end' or 'close' events.  Only dest.end() once.
36  if (!dest._isStdio && (!options || options.end !== false)) {
37    source.on('end', onend);
38    source.on('close', onclose);
39  }
40
41  let didOnEnd = false;
42  function onend() {
43    if (didOnEnd) return;
44    didOnEnd = true;
45
46    dest.end();
47  }
48
49
50  function onclose() {
51    if (didOnEnd) return;
52    didOnEnd = true;
53
54    if (typeof dest.destroy === 'function') dest.destroy();
55  }
56
57  // Don't leave dangling pipes when there are errors.
58  function onerror(er) {
59    cleanup();
60    if (EE.listenerCount(this, 'error') === 0) {
61      throw er; // Unhandled stream error in pipe.
62    }
63  }
64
65  source.on('error', onerror);
66  dest.on('error', onerror);
67
68  // Remove all the event listeners that were added.
69  function cleanup() {
70    source.removeListener('data', ondata);
71    dest.removeListener('drain', ondrain);
72
73    source.removeListener('end', onend);
74    source.removeListener('close', onclose);
75
76    source.removeListener('error', onerror);
77    dest.removeListener('error', onerror);
78
79    source.removeListener('end', cleanup);
80    source.removeListener('close', cleanup);
81
82    dest.removeListener('close', cleanup);
83  }
84
85  source.on('end', cleanup);
86  source.on('close', cleanup);
87
88  dest.on('close', cleanup);
89  dest.emit('pipe', source);
90
91  // Allow for unix-like usage: A.pipe(B).pipe(C)
92  return dest;
93};
94
95module.exports = Stream;
96