• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  Symbol,
5} = primordials;
6
7const { setImmediate } = require('timers');
8const assert = require('internal/assert');
9const { Socket } = require('net');
10const { JSStream } = internalBinding('js_stream');
11const uv = internalBinding('uv');
12let debug = require('internal/util/debuglog').debuglog(
13  'stream_socket',
14  (fn) => {
15    debug = fn;
16  },
17);
18const { owner_symbol } = require('internal/async_hooks').symbols;
19const { ERR_STREAM_WRAP } = require('internal/errors').codes;
20
21const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
22const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
23const kPendingShutdownRequest = Symbol('kPendingShutdownRequest');
24
25function isClosing() { return this[owner_symbol].isClosing(); }
26
27function onreadstart() { return this[owner_symbol].readStart(); }
28
29function onreadstop() { return this[owner_symbol].readStop(); }
30
31function onshutdown(req) { return this[owner_symbol].doShutdown(req); }
32
33function onwrite(req, bufs) { return this[owner_symbol].doWrite(req, bufs); }
34
35/* This class serves as a wrapper for when the C++ side of Node wants access
36 * to a standard JS stream. For example, TLS or HTTP do not operate on network
37 * resources conceptually, although that is the common case and what we are
38 * optimizing for; in theory, they are completely composable and can work with
39 * any stream resource they see.
40 *
41 * For the common case, i.e. a TLS socket wrapping around a net.Socket, we
42 * can skip going through the JS layer and let TLS access the raw C++ handle
43 * of a net.Socket. The flipside of this is that, to maintain composability,
44 * we need a way to create "fake" net.Socket instances that call back into a
45 * "real" JavaScript stream. JSStreamSocket is exactly this.
46 */
47class JSStreamSocket extends Socket {
48  constructor(stream) {
49    const handle = new JSStream();
50    handle.close = (cb) => {
51      debug('close');
52      this.doClose(cb);
53    };
54    // Inside of the following functions, `this` refers to the handle
55    // and `this[owner_symbol]` refers to this JSStreamSocket instance.
56    handle.isClosing = isClosing;
57    handle.onreadstart = onreadstart;
58    handle.onreadstop = onreadstop;
59    handle.onshutdown = onshutdown;
60    handle.onwrite = onwrite;
61
62    stream.pause();
63    stream.on('error', (err) => this.emit('error', err));
64    const ondata = (chunk) => {
65      if (typeof chunk === 'string' ||
66          stream.readableObjectMode === true) {
67        // Make sure that no further `data` events will happen.
68        stream.pause();
69        stream.removeListener('data', ondata);
70
71        this.emit('error', new ERR_STREAM_WRAP());
72        return;
73      }
74
75      debug('data', chunk.length);
76      if (this._handle)
77        this._handle.readBuffer(chunk);
78    };
79    stream.on('data', ondata);
80    stream.once('end', () => {
81      debug('end');
82      if (this._handle)
83        this._handle.emitEOF();
84    });
85    // Some `Stream` don't pass `hasError` parameters when closed.
86    stream.once('close', () => {
87      // Errors emitted from `stream` have also been emitted to this instance
88      // so that we don't pass errors to `destroy()` again.
89      this.destroy();
90    });
91
92    super({ handle, manualStart: true });
93    this.stream = stream;
94    this[kCurrentWriteRequest] = null;
95    this[kCurrentShutdownRequest] = null;
96    this[kPendingShutdownRequest] = null;
97    this.readable = stream.readable;
98    this.writable = stream.writable;
99
100    // Start reading.
101    this.read(0);
102  }
103
104  // Allow legacy requires in the test suite to keep working:
105  //   const { StreamWrap } = require('internal/js_stream_socket')
106  static get StreamWrap() {
107    return JSStreamSocket;
108  }
109
110  isClosing() {
111    return !this.readable || !this.writable;
112  }
113
114  readStart() {
115    this.stream.resume();
116    return 0;
117  }
118
119  readStop() {
120    this.stream.pause();
121    return 0;
122  }
123
124  doShutdown(req) {
125    // TODO(addaleax): It might be nice if we could get into a state where
126    // DoShutdown() is not called on streams while a write is still pending.
127    //
128    // Currently, the only part of the code base where that happens is the
129    // TLS implementation, which calls both DoWrite() and DoShutdown() on the
130    // underlying network stream inside of its own DoShutdown() method.
131    // Working around that on the native side is not quite trivial (yet?),
132    // so for now that is supported here.
133
134    if (this[kCurrentWriteRequest] !== null) {
135      this[kPendingShutdownRequest] = req;
136      return 0;
137    }
138    assert(this[kCurrentWriteRequest] === null);
139    assert(this[kCurrentShutdownRequest] === null);
140    this[kCurrentShutdownRequest] = req;
141
142    const handle = this._handle;
143
144    process.nextTick(() => {
145      // Ensure that write is dispatched asynchronously.
146      this.stream.end(() => {
147        this.finishShutdown(handle, 0);
148      });
149    });
150    return 0;
151  }
152
153  // handle === this._handle except when called from doClose().
154  finishShutdown(handle, errCode) {
155    // The shutdown request might already have been cancelled.
156    if (this[kCurrentShutdownRequest] === null)
157      return;
158    const req = this[kCurrentShutdownRequest];
159    this[kCurrentShutdownRequest] = null;
160    handle.finishShutdown(req, errCode);
161  }
162
163  doWrite(req, bufs) {
164    assert(this[kCurrentWriteRequest] === null);
165    assert(this[kCurrentShutdownRequest] === null);
166
167    const handle = this._handle;
168    const self = this;
169
170    let pending = bufs.length;
171
172    this.stream.cork();
173    // Use `var` over `let` for performance optimization.
174    // eslint-disable-next-line no-var
175    for (var i = 0; i < bufs.length; ++i)
176      this.stream.write(bufs[i], done);
177    this.stream.uncork();
178
179    // Only set the request here, because the `write()` calls could throw.
180    this[kCurrentWriteRequest] = req;
181
182    function done(err) {
183      if (!err && --pending !== 0)
184        return;
185
186      // Ensure that this is called once in case of error
187      pending = 0;
188
189      let errCode = 0;
190      if (err) {
191        errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE;
192      }
193
194      // Ensure that write was dispatched
195      setImmediate(() => {
196        self.finishWrite(handle, errCode);
197      });
198    }
199
200    return 0;
201  }
202
203  // handle === this._handle except when called from doClose().
204  finishWrite(handle, errCode) {
205    // The write request might already have been cancelled.
206    if (this[kCurrentWriteRequest] === null)
207      return;
208    const req = this[kCurrentWriteRequest];
209    this[kCurrentWriteRequest] = null;
210
211    handle.finishWrite(req, errCode);
212    if (this[kPendingShutdownRequest]) {
213      const req = this[kPendingShutdownRequest];
214      this[kPendingShutdownRequest] = null;
215      this.doShutdown(req);
216    }
217  }
218
219  doClose(cb) {
220    const handle = this._handle;
221
222    // When sockets of the "net" module destroyed, they will call
223    // `this._handle.close()` which will also emit EOF if not emitted before.
224    // This feature makes sockets on the other side emit "end" and "close"
225    // even though we haven't called `end()`. As `stream` are likely to be
226    // instances of `net.Socket`, calling `stream.destroy()` manually will
227    // avoid issues that don't properly close wrapped connections.
228    this.stream.destroy();
229
230    setImmediate(() => {
231      // Should be already set by net.js
232      assert(this._handle === null);
233
234      this.finishWrite(handle, uv.UV_ECANCELED);
235      this.finishShutdown(handle, uv.UV_ECANCELED);
236
237      cb();
238    });
239  }
240}
241
242module.exports = JSStreamSocket;
243