• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayIsArray,
5  ObjectSetPrototypeOf,
6} = primordials;
7
8const EE = require('events');
9
10function Stream(opts) {
11  EE.call(this, opts);
12}
13ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
14ObjectSetPrototypeOf(Stream, EE);
15
16Stream.prototype.pipe = function(dest, options) {
17  const source = this;
18
19  function ondata(chunk) {
20    if (dest.writable && dest.write(chunk) === false && source.pause) {
21      source.pause();
22    }
23  }
24
25  source.on('data', ondata);
26
27  function ondrain() {
28    if (source.readable && source.resume) {
29      source.resume();
30    }
31  }
32
33  dest.on('drain', ondrain);
34
35  // If the 'end' option is not supplied, dest.end() will be called when
36  // source gets the 'end' or 'close' events.  Only dest.end() once.
37  if (!dest._isStdio && (!options || options.end !== false)) {
38    source.on('end', onend);
39    source.on('close', onclose);
40  }
41
42  let didOnEnd = false;
43  function onend() {
44    if (didOnEnd) return;
45    didOnEnd = true;
46
47    dest.end();
48  }
49
50
51  function onclose() {
52    if (didOnEnd) return;
53    didOnEnd = true;
54
55    if (typeof dest.destroy === 'function') dest.destroy();
56  }
57
58  // Don't leave dangling pipes when there are errors.
59  function onerror(er) {
60    cleanup();
61    if (EE.listenerCount(this, 'error') === 0) {
62      this.emit('error', er);
63    }
64  }
65
66  prependListener(source, 'error', onerror);
67  prependListener(dest, 'error', onerror);
68
69  // Remove all the event listeners that were added.
70  function cleanup() {
71    source.removeListener('data', ondata);
72    dest.removeListener('drain', ondrain);
73
74    source.removeListener('end', onend);
75    source.removeListener('close', onclose);
76
77    source.removeListener('error', onerror);
78    dest.removeListener('error', onerror);
79
80    source.removeListener('end', cleanup);
81    source.removeListener('close', cleanup);
82
83    dest.removeListener('close', cleanup);
84  }
85
86  source.on('end', cleanup);
87  source.on('close', cleanup);
88
89  dest.on('close', cleanup);
90  dest.emit('pipe', source);
91
92  // Allow for unix-like usage: A.pipe(B).pipe(C)
93  return dest;
94};
95
96function prependListener(emitter, event, fn) {
97  // Sadly this is not cacheable as some libraries bundle their own
98  // event emitter implementation with them.
99  if (typeof emitter.prependListener === 'function')
100    return emitter.prependListener(event, fn);
101
102  // This is a hack to make sure that our error handler is attached before any
103  // userland ones.  NEVER DO THIS. This is here only because this code needs
104  // to continue to work with older versions of Node.js that do not include
105  // the prependListener() method. The goal is to eventually remove this hack.
106  if (!emitter._events || !emitter._events[event])
107    emitter.on(event, fn);
108  else if (ArrayIsArray(emitter._events[event]))
109    emitter._events[event].unshift(fn);
110  else
111    emitter._events[event] = [fn, emitter._events[event]];
112}
113
114module.exports = { Stream, prependListener };
115