• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  Symbol,
5} = primordials;
6
7const { setImmediate } = require('timers');
8const assert = require('internal/assert');
9const { Socket } = require('net');
10const { JSStream } = internalBinding('js_stream');
11const uv = internalBinding('uv');
12let debug = require('internal/util/debuglog').debuglog(
13  'stream_socket',
14  (fn) => {
15    debug = fn;
16  },
17);
18const { owner_symbol } = require('internal/async_hooks').symbols;
19const { ERR_STREAM_WRAP } = require('internal/errors').codes;
20
21const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
22const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
23const kPendingShutdownRequest = Symbol('kPendingShutdownRequest');
24const kPendingClose = Symbol('kPendingClose');
25
26function isClosing() { return this[owner_symbol].isClosing(); }
27
28function onreadstart() { return this[owner_symbol].readStart(); }
29
30function onreadstop() { return this[owner_symbol].readStop(); }
31
32function onshutdown(req) { return this[owner_symbol].doShutdown(req); }
33
34function onwrite(req, bufs) { return this[owner_symbol].doWrite(req, bufs); }
35
36/* This class serves as a wrapper for when the C++ side of Node wants access
37 * to a standard JS stream. For example, TLS or HTTP do not operate on network
38 * resources conceptually, although that is the common case and what we are
39 * optimizing for; in theory, they are completely composable and can work with
40 * any stream resource they see.
41 *
42 * For the common case, i.e. a TLS socket wrapping around a net.Socket, we
43 * can skip going through the JS layer and let TLS access the raw C++ handle
44 * of a net.Socket. The flipside of this is that, to maintain composability,
45 * we need a way to create "fake" net.Socket instances that call back into a
46 * "real" JavaScript stream. JSStreamSocket is exactly this.
47 */
48class JSStreamSocket extends Socket {
49  constructor(stream) {
50    const handle = new JSStream();
51    handle.close = (cb) => {
52      debug('close');
53      this.doClose(cb);
54    };
55    // Inside of the following functions, `this` refers to the handle
56    // and `this[owner_symbol]` refers to this JSStreamSocket instance.
57    handle.isClosing = isClosing;
58    handle.onreadstart = onreadstart;
59    handle.onreadstop = onreadstop;
60    handle.onshutdown = onshutdown;
61    handle.onwrite = onwrite;
62
63    stream.pause();
64    stream.on('error', (err) => this.emit('error', err));
65    const ondata = (chunk) => {
66      if (typeof chunk === 'string' ||
67          stream.readableObjectMode === true) {
68        // Make sure that no further `data` events will happen.
69        stream.pause();
70        stream.removeListener('data', ondata);
71
72        this.emit('error', new ERR_STREAM_WRAP());
73        return;
74      }
75
76      debug('data', chunk.length);
77      if (this._handle)
78        this._handle.readBuffer(chunk);
79    };
80    stream.on('data', ondata);
81    stream.once('end', () => {
82      debug('end');
83      if (this._handle)
84        this._handle.emitEOF();
85    });
86    // Some `Stream` don't pass `hasError` parameters when closed.
87    stream.once('close', () => {
88      // Errors emitted from `stream` have also been emitted to this instance
89      // so that we don't pass errors to `destroy()` again.
90      this.destroy();
91    });
92
93    super({ handle, manualStart: true });
94    this.stream = stream;
95    this[kCurrentWriteRequest] = null;
96    this[kCurrentShutdownRequest] = null;
97    this[kPendingShutdownRequest] = null;
98    this[kPendingClose] = false;
99    this.readable = stream.readable;
100    this.writable = stream.writable;
101
102    // Start reading.
103    this.read(0);
104  }
105
106  // Allow legacy requires in the test suite to keep working:
107  //   const { StreamWrap } = require('internal/js_stream_socket')
108  static get StreamWrap() {
109    return JSStreamSocket;
110  }
111
112  isClosing() {
113    return !this.readable || !this.writable;
114  }
115
116  readStart() {
117    this.stream.resume();
118    return 0;
119  }
120
121  readStop() {
122    this.stream.pause();
123    return 0;
124  }
125
126  doShutdown(req) {
127    // TODO(addaleax): It might be nice if we could get into a state where
128    // DoShutdown() is not called on streams while a write is still pending.
129    //
130    // Currently, the only part of the code base where that happens is the
131    // TLS implementation, which calls both DoWrite() and DoShutdown() on the
132    // underlying network stream inside of its own DoShutdown() method.
133    // Working around that on the native side is not quite trivial (yet?),
134    // so for now that is supported here.
135
136    if (this[kCurrentWriteRequest] !== null) {
137      this[kPendingShutdownRequest] = req;
138      return 0;
139    }
140
141    assert(this[kCurrentWriteRequest] === null);
142    assert(this[kCurrentShutdownRequest] === null);
143    this[kCurrentShutdownRequest] = req;
144
145    if (this[kPendingClose]) {
146      // If doClose is pending, the stream & this._handle are gone. We can't do
147      // anything. doClose will call finishShutdown with ECANCELED for us shortly.
148      return 0;
149    }
150
151    const handle = this._handle;
152    assert(handle !== null);
153
154    process.nextTick(() => {
155      // Ensure that write is dispatched asynchronously.
156      this.stream.end(() => {
157        this.finishShutdown(handle, 0);
158      });
159    });
160    return 0;
161  }
162
163  // handle === this._handle except when called from doClose().
164  finishShutdown(handle, errCode) {
165    // The shutdown request might already have been cancelled.
166    if (this[kCurrentShutdownRequest] === null)
167      return;
168    const req = this[kCurrentShutdownRequest];
169    this[kCurrentShutdownRequest] = null;
170    handle.finishShutdown(req, errCode);
171  }
172
173  doWrite(req, bufs) {
174    assert(this[kCurrentWriteRequest] === null);
175    assert(this[kCurrentShutdownRequest] === null);
176
177    if (this[kPendingClose]) {
178      // If doClose is pending, the stream & this._handle are gone. We can't do
179      // anything. doClose will call finishWrite with ECANCELED for us shortly.
180      this[kCurrentWriteRequest] = req; // Store req, for doClose to cancel
181      return 0;
182    }
183
184    const handle = this._handle;
185    assert(handle !== null);
186
187    const self = this;
188
189    let pending = bufs.length;
190
191    this.stream.cork();
192    // Use `var` over `let` for performance optimization.
193    // eslint-disable-next-line no-var
194    for (var i = 0; i < bufs.length; ++i)
195      this.stream.write(bufs[i], done);
196    this.stream.uncork();
197
198    // Only set the request here, because the `write()` calls could throw.
199    this[kCurrentWriteRequest] = req;
200
201    function done(err) {
202      if (!err && --pending !== 0)
203        return;
204
205      // Ensure that this is called once in case of error
206      pending = 0;
207
208      let errCode = 0;
209      if (err) {
210        errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE;
211      }
212
213      // Ensure that write was dispatched
214      setImmediate(() => {
215        self.finishWrite(handle, errCode);
216      });
217    }
218
219    return 0;
220  }
221
222  // handle === this._handle except when called from doClose().
223  finishWrite(handle, errCode) {
224    // The write request might already have been cancelled.
225    if (this[kCurrentWriteRequest] === null)
226      return;
227    const req = this[kCurrentWriteRequest];
228    this[kCurrentWriteRequest] = null;
229
230    handle.finishWrite(req, errCode);
231    if (this[kPendingShutdownRequest]) {
232      const req = this[kPendingShutdownRequest];
233      this[kPendingShutdownRequest] = null;
234      this.doShutdown(req);
235    }
236  }
237
238  doClose(cb) {
239    this[kPendingClose] = true;
240
241    const handle = this._handle;
242
243    // When sockets of the "net" module destroyed, they will call
244    // `this._handle.close()` which will also emit EOF if not emitted before.
245    // This feature makes sockets on the other side emit "end" and "close"
246    // even though we haven't called `end()`. As `stream` are likely to be
247    // instances of `net.Socket`, calling `stream.destroy()` manually will
248    // avoid issues that don't properly close wrapped connections.
249    this.stream.destroy();
250
251    setImmediate(() => {
252      // Should be already set by net.js
253      assert(this._handle === null);
254
255      this.finishWrite(handle, uv.UV_ECANCELED);
256      this.finishShutdown(handle, uv.UV_ECANCELED);
257
258      this[kPendingClose] = false;
259
260      cb();
261    });
262  }
263}
264
265module.exports = JSStreamSocket;
266