1'use strict'; 2 3const { 4 Symbol, 5} = primordials; 6 7const { setImmediate } = require('timers'); 8const assert = require('internal/assert'); 9const { Socket } = require('net'); 10const { JSStream } = internalBinding('js_stream'); 11const uv = internalBinding('uv'); 12let debug = require('internal/util/debuglog').debuglog( 13 'stream_socket', 14 (fn) => { 15 debug = fn; 16 }, 17); 18const { owner_symbol } = require('internal/async_hooks').symbols; 19const { ERR_STREAM_WRAP } = require('internal/errors').codes; 20 21const kCurrentWriteRequest = Symbol('kCurrentWriteRequest'); 22const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest'); 23const kPendingShutdownRequest = Symbol('kPendingShutdownRequest'); 24const kPendingClose = Symbol('kPendingClose'); 25 26function isClosing() { return this[owner_symbol].isClosing(); } 27 28function onreadstart() { return this[owner_symbol].readStart(); } 29 30function onreadstop() { return this[owner_symbol].readStop(); } 31 32function onshutdown(req) { return this[owner_symbol].doShutdown(req); } 33 34function onwrite(req, bufs) { return this[owner_symbol].doWrite(req, bufs); } 35 36/* This class serves as a wrapper for when the C++ side of Node wants access 37 * to a standard JS stream. For example, TLS or HTTP do not operate on network 38 * resources conceptually, although that is the common case and what we are 39 * optimizing for; in theory, they are completely composable and can work with 40 * any stream resource they see. 41 * 42 * For the common case, i.e. a TLS socket wrapping around a net.Socket, we 43 * can skip going through the JS layer and let TLS access the raw C++ handle 44 * of a net.Socket. The flipside of this is that, to maintain composability, 45 * we need a way to create "fake" net.Socket instances that call back into a 46 * "real" JavaScript stream. JSStreamSocket is exactly this. 47 */ 48class JSStreamSocket extends Socket { 49 constructor(stream) { 50 const handle = new JSStream(); 51 handle.close = (cb) => { 52 debug('close'); 53 this.doClose(cb); 54 }; 55 // Inside of the following functions, `this` refers to the handle 56 // and `this[owner_symbol]` refers to this JSStreamSocket instance. 57 handle.isClosing = isClosing; 58 handle.onreadstart = onreadstart; 59 handle.onreadstop = onreadstop; 60 handle.onshutdown = onshutdown; 61 handle.onwrite = onwrite; 62 63 stream.pause(); 64 stream.on('error', (err) => this.emit('error', err)); 65 const ondata = (chunk) => { 66 if (typeof chunk === 'string' || 67 stream.readableObjectMode === true) { 68 // Make sure that no further `data` events will happen. 69 stream.pause(); 70 stream.removeListener('data', ondata); 71 72 this.emit('error', new ERR_STREAM_WRAP()); 73 return; 74 } 75 76 debug('data', chunk.length); 77 if (this._handle) 78 this._handle.readBuffer(chunk); 79 }; 80 stream.on('data', ondata); 81 stream.once('end', () => { 82 debug('end'); 83 if (this._handle) 84 this._handle.emitEOF(); 85 }); 86 // Some `Stream` don't pass `hasError` parameters when closed. 87 stream.once('close', () => { 88 // Errors emitted from `stream` have also been emitted to this instance 89 // so that we don't pass errors to `destroy()` again. 90 this.destroy(); 91 }); 92 93 super({ handle, manualStart: true }); 94 this.stream = stream; 95 this[kCurrentWriteRequest] = null; 96 this[kCurrentShutdownRequest] = null; 97 this[kPendingShutdownRequest] = null; 98 this[kPendingClose] = false; 99 this.readable = stream.readable; 100 this.writable = stream.writable; 101 102 // Start reading. 103 this.read(0); 104 } 105 106 // Allow legacy requires in the test suite to keep working: 107 // const { StreamWrap } = require('internal/js_stream_socket') 108 static get StreamWrap() { 109 return JSStreamSocket; 110 } 111 112 isClosing() { 113 return !this.readable || !this.writable; 114 } 115 116 readStart() { 117 this.stream.resume(); 118 return 0; 119 } 120 121 readStop() { 122 this.stream.pause(); 123 return 0; 124 } 125 126 doShutdown(req) { 127 // TODO(addaleax): It might be nice if we could get into a state where 128 // DoShutdown() is not called on streams while a write is still pending. 129 // 130 // Currently, the only part of the code base where that happens is the 131 // TLS implementation, which calls both DoWrite() and DoShutdown() on the 132 // underlying network stream inside of its own DoShutdown() method. 133 // Working around that on the native side is not quite trivial (yet?), 134 // so for now that is supported here. 135 136 if (this[kCurrentWriteRequest] !== null) { 137 this[kPendingShutdownRequest] = req; 138 return 0; 139 } 140 141 assert(this[kCurrentWriteRequest] === null); 142 assert(this[kCurrentShutdownRequest] === null); 143 this[kCurrentShutdownRequest] = req; 144 145 if (this[kPendingClose]) { 146 // If doClose is pending, the stream & this._handle are gone. We can't do 147 // anything. doClose will call finishShutdown with ECANCELED for us shortly. 148 return 0; 149 } 150 151 const handle = this._handle; 152 assert(handle !== null); 153 154 process.nextTick(() => { 155 // Ensure that write is dispatched asynchronously. 156 this.stream.end(() => { 157 this.finishShutdown(handle, 0); 158 }); 159 }); 160 return 0; 161 } 162 163 // handle === this._handle except when called from doClose(). 164 finishShutdown(handle, errCode) { 165 // The shutdown request might already have been cancelled. 166 if (this[kCurrentShutdownRequest] === null) 167 return; 168 const req = this[kCurrentShutdownRequest]; 169 this[kCurrentShutdownRequest] = null; 170 handle.finishShutdown(req, errCode); 171 } 172 173 doWrite(req, bufs) { 174 assert(this[kCurrentWriteRequest] === null); 175 assert(this[kCurrentShutdownRequest] === null); 176 177 if (this[kPendingClose]) { 178 // If doClose is pending, the stream & this._handle are gone. We can't do 179 // anything. doClose will call finishWrite with ECANCELED for us shortly. 180 this[kCurrentWriteRequest] = req; // Store req, for doClose to cancel 181 return 0; 182 } 183 184 const handle = this._handle; 185 assert(handle !== null); 186 187 const self = this; 188 189 let pending = bufs.length; 190 191 this.stream.cork(); 192 // Use `var` over `let` for performance optimization. 193 // eslint-disable-next-line no-var 194 for (var i = 0; i < bufs.length; ++i) 195 this.stream.write(bufs[i], done); 196 this.stream.uncork(); 197 198 // Only set the request here, because the `write()` calls could throw. 199 this[kCurrentWriteRequest] = req; 200 201 function done(err) { 202 if (!err && --pending !== 0) 203 return; 204 205 // Ensure that this is called once in case of error 206 pending = 0; 207 208 let errCode = 0; 209 if (err) { 210 errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE; 211 } 212 213 // Ensure that write was dispatched 214 setImmediate(() => { 215 self.finishWrite(handle, errCode); 216 }); 217 } 218 219 return 0; 220 } 221 222 // handle === this._handle except when called from doClose(). 223 finishWrite(handle, errCode) { 224 // The write request might already have been cancelled. 225 if (this[kCurrentWriteRequest] === null) 226 return; 227 const req = this[kCurrentWriteRequest]; 228 this[kCurrentWriteRequest] = null; 229 230 handle.finishWrite(req, errCode); 231 if (this[kPendingShutdownRequest]) { 232 const req = this[kPendingShutdownRequest]; 233 this[kPendingShutdownRequest] = null; 234 this.doShutdown(req); 235 } 236 } 237 238 doClose(cb) { 239 this[kPendingClose] = true; 240 241 const handle = this._handle; 242 243 // When sockets of the "net" module destroyed, they will call 244 // `this._handle.close()` which will also emit EOF if not emitted before. 245 // This feature makes sockets on the other side emit "end" and "close" 246 // even though we haven't called `end()`. As `stream` are likely to be 247 // instances of `net.Socket`, calling `stream.destroy()` manually will 248 // avoid issues that don't properly close wrapped connections. 249 this.stream.destroy(); 250 251 setImmediate(() => { 252 // Should be already set by net.js 253 assert(this._handle === null); 254 255 this.finishWrite(handle, uv.UV_ECANCELED); 256 this.finishShutdown(handle, uv.UV_ECANCELED); 257 258 this[kPendingClose] = false; 259 260 cb(); 261 }); 262 } 263} 264 265module.exports = JSStreamSocket; 266