• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"use strict";
2
3var stream = require("stream");
4
5function DuplexWrapper(options, writable, readable) {
6  if (typeof readable === "undefined") {
7    readable = writable;
8    writable = options;
9    options = null;
10  }
11
12  stream.Duplex.call(this, options);
13
14  if (typeof readable.read !== "function") {
15    readable = (new stream.Readable(options)).wrap(readable);
16  }
17
18  this._writable = writable;
19  this._readable = readable;
20  this._waiting = false;
21
22  var self = this;
23
24  writable.once("finish", function() {
25    self.end();
26  });
27
28  this.once("finish", function() {
29    writable.end();
30  });
31
32  readable.on("readable", function() {
33    if (self._waiting) {
34      self._waiting = false;
35      self._read();
36    }
37  });
38
39  readable.once("end", function() {
40    self.push(null);
41  });
42
43  if (!options || typeof options.bubbleErrors === "undefined" || options.bubbleErrors) {
44    writable.on("error", function(err) {
45      self.emit("error", err);
46    });
47
48    readable.on("error", function(err) {
49      self.emit("error", err);
50    });
51  }
52}
53
54DuplexWrapper.prototype = Object.create(stream.Duplex.prototype, {constructor: {value: DuplexWrapper}});
55
56DuplexWrapper.prototype._write = function _write(input, encoding, done) {
57  this._writable.write(input, encoding, done);
58};
59
60DuplexWrapper.prototype._read = function _read() {
61  var buf;
62  var reads = 0;
63  while ((buf = this._readable.read()) !== null) {
64    this.push(buf);
65    reads++;
66  }
67  if (reads === 0) {
68    this._waiting = true;
69  }
70};
71
72module.exports = function duplex2(options, writable, readable) {
73  return new DuplexWrapper(options, writable, readable);
74};
75
76module.exports.DuplexWrapper = DuplexWrapper;
77